diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index bf0d3ba858..8cbb3ff5c4 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -475,4 +475,58 @@ pub(crate) mod tests { let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::Eof.temp()); } + + #[tokio::test] + async fn test_holocene_interleaved_invalid_channel() { + let frames = vec![ + // -- First channel is dropped since it is replaced by the second channel -- + // -- Second channel is dropped since it isn't closed -- + Frame { id: [0x01; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0x02; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0x01; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + Frame { id: [0x02; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + // -- Third Channel - VALID & CLOSED -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames[4..].iter() { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_holocene_interleaved_valid_channel() { + let frames = vec![ + // -- First channel is dropped since it is replaced by the second channel -- + // -- Second channel is successfully closed so it's valid -- + Frame { id: [0x01; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0x02; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0x01; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + Frame { id: [0x02; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + // -- Third Channel - VALID & CLOSED -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in [&frames[1], &frames[3], &frames[4], &frames[5]].iter() { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, **frame); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } }