diff --git a/.github/workflows/rust_ci.yaml b/.github/workflows/rust_ci.yaml index 8a23b5c777..0a18cd7fb7 100644 --- a/.github/workflows/rust_ci.yaml +++ b/.github/workflows/rust_ci.yaml @@ -22,7 +22,7 @@ jobs: cache-on-failure: true - uses: taiki-e/install-action@nextest - name: cargo test - run: cargo nextest run --release --workspace --all --all-features --locked + run: RUST_MIN_STACK=33554432 cargo nextest run --release --workspace --all --all-features --locked cargo-lint: runs-on: ubuntu-latest timeout-minutes: 20 diff --git a/Cargo.lock b/Cargo.lock index a0f8e4c648..4181c7a596 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -182,7 +182,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -463,7 +463,7 @@ checksum = "4d0f2d905ebd295e7effec65e5f6868d153936130ae718352771de3e7d03c75c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -570,7 +570,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -586,7 +586,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "syn-solidity", "tiny-keccak", ] @@ -602,7 +602,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "syn-solidity", ] @@ -876,7 +876,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -887,7 +887,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -914,7 +914,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -973,7 +973,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1089,7 +1089,7 @@ checksum = "523363cbe1df49b68215efdf500b103ac3b0fb4836aed6d15689a076eadb8fff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1249,7 +1249,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1506,7 +1506,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.1", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1526,7 +1526,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "unicode-xid", ] @@ -1619,7 +1619,7 @@ checksum = "2f9ed6b3789237c8a0c1c505af1c7eb2c560df6186f01b098c3a1064ea532f38" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1791,7 +1791,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2326,7 +2326,7 @@ dependencies = [ "kona-common", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2687,7 +2687,7 @@ checksum = "1bb5c1d8184f13f7d0ccbeeca0def2f9a181bce2624302793005f5ca8aa62e5e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2866,7 +2866,7 @@ checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2987,7 +2987,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3125,7 +3125,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3269,7 +3269,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3361,7 +3361,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3396,7 +3396,7 @@ checksum = "ca414edb151b4c8d125c12566ab0d74dc9cdba36fb80eb7b848c15f495fd32d1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3729,7 +3729,7 @@ checksum = "09cb82b74b4810f07e460852c32f522e979787691b0b7b7439fe473e49d49b2f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3862,9 +3862,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" [[package]] name = "rustls-webpki" @@ -4022,7 +4022,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4046,7 +4046,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4245,7 +4245,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4317,9 +4317,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.77" +version = "2.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" +checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" dependencies = [ "proc-macro2", "quote", @@ -4335,7 +4335,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4418,7 +4418,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4428,7 +4428,7 @@ source = "git+https://github.com/quartiq/thiserror?branch=no-std#e779e1b70023cee dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4541,7 +4541,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4653,7 +4653,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -4932,7 +4932,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "wasm-bindgen-shared", ] @@ -4966,7 +4966,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5166,7 +5166,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -5186,7 +5186,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] diff --git a/crates/derive/src/metrics.rs b/crates/derive/src/metrics.rs index 589642f198..3f9d4b9433 100644 --- a/crates/derive/src/metrics.rs +++ b/crates/derive/src/metrics.rs @@ -48,6 +48,14 @@ lazy_static! { &["error"] ).expect("Batch Reader Errors failed to register"); + /// Tracks the number of times the channel queue was detected + /// non-empty during a frame ingestion, and new channel creation + /// was attempted post-holocene. + pub static ref CHANNEL_QUEUE_NON_EMPTY: IntGauge = register_int_gauge!( + "kona_derive_channel_queue_non_empty", + "Number of times a channel was attempted to be created in the channel bank, but the queue is non-empty post-holocene." + ).expect("Channel Queue Non Empty failed to register"); + /// Tracks the compression ratio of batches. pub static ref BATCH_COMPRESSION_RATIO: IntGauge = register_int_gauge!( "kona_derive_batch_compression_ratio", diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index a9a5d24bc8..50a2fdee9f 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -41,13 +41,13 @@ where P: ChannelBankProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, { /// The rollup configuration. - cfg: Arc, + pub cfg: Arc, /// Map of channels by ID. - channels: HashMap, + pub channels: HashMap, /// Channels in FIFO order. - channel_queue: VecDeque, + pub channel_queue: VecDeque, /// The previous stage of the derivation pipeline. - prev: P, + pub prev: P, } impl

ChannelBank

@@ -83,11 +83,25 @@ where let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; // Get the channel for the frame, or create a new one if it doesn't exist. - let current_channel = self.channels.entry(frame.id).or_insert_with(|| { - let channel = Channel::new(frame.id, origin); - self.channel_queue.push_back(frame.id); - channel - }); + let current_channel = match self.channels.get_mut(&frame.id) { + Some(c) => c, + None => { + if self.cfg.is_holocene_active(origin.timestamp) && !self.channel_queue.is_empty() { + // In holocene, channels are strictly ordered. + // If the previous frame is not the last in the channel + // and a starting frame for the next channel arrives, + // the previous channel/frames are removed and a new channel is created. + self.channel_queue.clear(); + + trace!(target: "channel-bank", "[holocene active] clearing non-empty channel queue"); + crate::inc!(CHANNEL_QUEUE_NON_EMPTY); + } + let channel = Channel::new(frame.id, origin); + self.channel_queue.push_back(frame.id); + self.channels.insert(frame.id, channel); + self.channels.get_mut(&frame.id).expect("Channel must be in queue") + } + }; // Check if the channel is not timed out. If it has, ignore the frame. if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) < @@ -309,11 +323,35 @@ mod tests { assert_eq!(trace_store.lock().iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1); } + #[test] + fn test_holocene_ingest_new_channel_unclosed() { + let frames = [ + // -- First Channel -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: false }, + // -- Second Channel -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + ]; + let mock = MockChannelBankProvider::new(vec![]); + let rollup_config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut channel_bank = ChannelBank::new(Arc::new(rollup_config), mock); + for frame in frames.iter().take(3) { + channel_bank.ingest_frame(frame.clone()).unwrap(); + } + assert_eq!(channel_bank.channel_queue.len(), 1); + assert_eq!(channel_bank.channel_queue[0], [0xEE; 16]); + // When we ingest the next frame, channel queue will be cleared since the previous + // channel is not closed. This is invalid by Holocene rules. + channel_bank.ingest_frame(frames[3].clone()).unwrap(); + assert_eq!(channel_bank.channel_queue.len(), 1); + assert_eq!(channel_bank.channel_queue[0], [0xFF; 16]); + } + #[test] fn test_ingest_and_prune_channel_bank() { use alloc::vec::Vec; let mut frames: Vec = new_test_frames(100000); - // let data = frames.iter().map(|f| Ok(f)).collect::>>(); let mock = MockChannelBankProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); let mut channel_bank = ChannelBank::new(cfg, mock);