diff --git a/crates/protocol/derive/src/metrics/mod.rs b/crates/protocol/derive/src/metrics/mod.rs index 659c708406..83547e43db 100644 --- a/crates/protocol/derive/src/metrics/mod.rs +++ b/crates/protocol/derive/src/metrics/mod.rs @@ -8,6 +8,12 @@ impl Metrics { /// Identifier for the pipeline origin gauge. pub const PIPELINE_ORIGIN: &str = "kona_derive_pipeline_origin"; + /// Identifier for the latest l2 block the pipeline stepped on. + pub const PIPELINE_STEP_BLOCK: &str = "kona_derive_pipeline_step_block"; + + /// Identifier for if the batch reader is set. + pub const PIPELINE_BATCH_READER_SET: &str = "kona_derive_batch_reader_set"; + /// Identifier to track the amount of time it takes to advance the pipeline origin. pub const PIPELINE_ORIGIN_ADVANCE: &str = "kona_derive_pipeline_origin_advance"; @@ -37,10 +43,52 @@ impl Metrics { /// Identifier for the batch stream stage batch memory overhead gauge. pub const PIPELINE_BATCH_MEM: &str = "kona_derive_batch_mem"; + + /// Identifier for the size of batches read by the channel reader. + pub const PIPELINE_READ_BATCHES: &str = "kona_derive_read_batches"; + + /// Identifier for the gauge that tracks the number of pipeline steps. + pub const PIPELINE_STEPS: &str = "kona_derive_pipeline_steps"; + + /// Identifier for the gauge that tracks the number of prepared attributes. + pub const PIPELINE_PREPARED_ATTRIBUTES: &str = "kona_derive_prepared_attributes"; + + /// Identifier tracking the number of pipeline signals. + pub const PIPELINE_SIGNALS: &str = "kona_derive_pipeline_signals"; + + /// Identifier that tracks the batch validator l1 blocks start. + pub const PIPELINE_L1_BLOCKS_START: &str = "kona_derive_l1_blocks_start"; + + /// Identifier that tracks the batch validator l1 blocks end. + pub const PIPELINE_L1_BLOCKS_END: &str = "kona_derive_l1_blocks_end"; + + /// Identifier to track the size of the current derived span batch. + pub const PIPELINE_DERIVED_SPAN_SIZE: &str = "kona_derive_span_size"; + + /// Identifier to track the number of transactions in the latest derived payload attributes. + pub const PIPELINE_LATEST_PAYLOAD_TX_COUNT: &str = "kona_derive_payload_tx_count"; + + /// Identifier for the data availability provider data. + pub const PIPELINE_DATA_AVAILABILITY_PROVIDER: &str = "kona_derive_dap_sources"; + + /// Identifier for a gauge that tracks batch validity. + pub const PIPELINE_BATCH_VALIDITY: &str = "kona_derive_batch_validity"; + + /// Identifier for the histogram that tracks the amount of time it takes to validate a + /// span batch. + pub const PIPELINE_CHECK_BATCH_PREFIX: &str = "kona_derive_check_batch_prefix_duration"; + + /// Identifier for the histogram that tracks the amount of time it takes to build payload + /// attributes. + pub const PIPELINE_ATTRIBUTES_BUILD_DURATION: &str = "kona_derive_attributes_build_duration"; + + /// Identifier for the gauge that tracks the number of payload attributes buffered in the + /// pipeline. + pub const PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER: &str = "kona_derive_payload_attributes_buffer"; } impl Metrics { - /// Initializes metrics for the P2P stack. + /// Initializes metrics. /// /// This does two things: /// * Describes various metrics. @@ -51,13 +99,41 @@ impl Metrics { Self::zero(); } - /// Describes metrics used in [`kona_p2p`][crate]. + /// Describes metrics. #[cfg(feature = "metrics")] pub fn describe() { metrics::describe_gauge!( Self::PIPELINE_ORIGIN, "The block height of the pipeline l1 origin" ); + metrics::describe_gauge!( + Self::PIPELINE_BATCH_VALIDITY, + "The validity of the batch being processed", + ); + metrics::describe_gauge!( + Self::PIPELINE_DATA_AVAILABILITY_PROVIDER, + "The source of pipeline data" + ); + metrics::describe_gauge!( + Self::PIPELINE_DERIVED_SPAN_SIZE, + "The number of payload attributes in the current span" + ); + metrics::describe_gauge!( + Self::PIPELINE_LATEST_PAYLOAD_TX_COUNT, + "The number of transactions in the latest derived payload attributes" + ); + metrics::describe_gauge!(Self::PIPELINE_READ_BATCHES, "The read batches"); + metrics::describe_gauge!(Self::PIPELINE_BATCH_READER_SET, "If the batch reader is set"); + metrics::describe_gauge!(Self::PIPELINE_L1_BLOCKS_START, "Earliest l1 blocks height"); + metrics::describe_gauge!(Self::PIPELINE_L1_BLOCKS_END, "Latest l1 blocks height"); + metrics::describe_gauge!( + Self::PIPELINE_STEP_BLOCK, + "The latest L2 block height that the pipeline stepped on" + ); + metrics::describe_histogram!( + Self::PIPELINE_CHECK_BATCH_PREFIX, + "The time it takes to validate a span batch" + ); metrics::describe_histogram!( Self::PIPELINE_ORIGIN_ADVANCE, "The amount of time it takes to advance the pipeline origin" @@ -98,13 +174,61 @@ impl Metrics { Self::PIPELINE_BATCH_MEM, "The memory size of batches held in the batch stream stage" ); + metrics::describe_gauge!( + Self::PIPELINE_STEPS, + "The total number of pipeline steps on the derivation pipeline" + ); + metrics::describe_gauge!( + Self::PIPELINE_PREPARED_ATTRIBUTES, + "The total number of prepared attributes generated by the derivation pipeline" + ); + metrics::describe_gauge!( + Self::PIPELINE_SIGNALS, + "Number of times the pipeline has been signalled" + ); + metrics::describe_histogram!( + Self::PIPELINE_ATTRIBUTES_BUILD_DURATION, + "The time it takes to build payload attributes" + ); + metrics::describe_gauge!( + Self::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER, + "The number of payload attributes buffered in the pipeline" + ); } /// Initializes metrics to 0 so they can be queried immediately. #[cfg(feature = "metrics")] pub fn zero() { - kona_macros::set!(gauge, Self::PIPELINE_FRAME_QUEUE_BUFFER, 0); + // The batch reader is by default not set. + kona_macros::set!(gauge, Self::PIPELINE_BATCH_READER_SET, 0); + + // No source data is initially read. + kona_macros::set!(gauge, Self::PIPELINE_DATA_AVAILABILITY_PROVIDER, "source", "blobs", 0); + kona_macros::set!( + gauge, + Self::PIPELINE_DATA_AVAILABILITY_PROVIDER, + "source", + "calldata", + 0 + ); + + // Pipeline signals start at zero. + kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "reset", 0); + kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "activation", 0); + kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "flush_channel", 0); + + // No batches are initially read. + kona_macros::set!(gauge, Self::PIPELINE_READ_BATCHES, "type", "single", 0); + kona_macros::set!(gauge, Self::PIPELINE_READ_BATCHES, "type", "span", 0); + + // Cumulative counters start at zero. + kona_macros::set!(gauge, Self::PIPELINE_STEPS, 0); + kona_macros::set!(gauge, Self::PIPELINE_PREPARED_ATTRIBUTES, 0); + + // All buffers can be zeroed out since they are expected to return to zero. + kona_macros::set!(gauge, Self::PIPELINE_BATCH_BUFFER, 0); kona_macros::set!(gauge, Self::PIPELINE_CHANNEL_BUFFER, 0); - kona_macros::set!(gauge, Self::PIPELINE_CHANNEL_TIMEOUT, 0); + kona_macros::set!(gauge, Self::PIPELINE_FRAME_QUEUE_BUFFER, 0); + kona_macros::set!(gauge, Self::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER, 0); } } diff --git a/crates/protocol/derive/src/pipeline/core.rs b/crates/protocol/derive/src/pipeline/core.rs index bbccc3e6d7..43be7a34a6 100644 --- a/crates/protocol/derive/src/pipeline/core.rs +++ b/crates/protocol/derive/src/pipeline/core.rs @@ -65,6 +65,11 @@ where type Item = OpAttributesWithParent; fn next(&mut self) -> Option { + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER, + self.prepared.len().saturating_sub(1) as f64 + ); self.prepared.pop_front() } } @@ -117,6 +122,11 @@ where self.attributes.signal(signal).await?; } } + kona_macros::inc!( + gauge, + crate::metrics::Metrics::PIPELINE_SIGNALS, + "type" => signal.to_string(), + ); Ok(()) } } @@ -161,10 +171,35 @@ where /// /// [PipelineError]: crate::errors::PipelineError async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { + kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_STEPS); + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_STEP_BLOCK, + cursor.block_info.number as f64 + ); match self.attributes.next_attributes(cursor).await { Ok(a) => { trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a); + kona_macros::inc!( + gauge, + crate::metrics::Metrics::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER + ); + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_LATEST_PAYLOAD_TX_COUNT, + a.inner.transactions.as_ref().map_or(0.0, |txs| txs.len() as f64) + ); + if !a.is_last_in_span { + kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_DERIVED_SPAN_SIZE); + } else { + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_DERIVED_SPAN_SIZE, + 0 + ); + } self.prepared.push_back(a); + kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_PREPARED_ATTRIBUTES); StepResult::PreparedAttributes } Err(err) => match err { diff --git a/crates/protocol/derive/src/sources/blobs.rs b/crates/protocol/derive/src/sources/blobs.rs index 55f358df27..685a904732 100644 --- a/crates/protocol/derive/src/sources/blobs.rs +++ b/crates/protocol/derive/src/sources/blobs.rs @@ -105,6 +105,12 @@ where index += 1; } } + #[cfg(feature = "metrics")] + metrics::gauge!( + crate::metrics::Metrics::PIPELINE_DATA_AVAILABILITY_PROVIDER, + "source" => "blobs", + ) + .increment(data.len() as f64); (data, hashes) } diff --git a/crates/protocol/derive/src/sources/calldata.rs b/crates/protocol/derive/src/sources/calldata.rs index 9e5d5134ca..4d2c4d35ff 100644 --- a/crates/protocol/derive/src/sources/calldata.rs +++ b/crates/protocol/derive/src/sources/calldata.rs @@ -67,6 +67,13 @@ impl CalldataSource { }) .collect::>(); + #[cfg(feature = "metrics")] + metrics::gauge!( + crate::metrics::Metrics::PIPELINE_DATA_AVAILABILITY_PROVIDER, + "source" => "calldata", + ) + .increment(self.calldata.len() as f64); + self.open = true; Ok(()) diff --git a/crates/protocol/derive/src/stages/attributes_queue.rs b/crates/protocol/derive/src/stages/attributes_queue.rs index 5ffc45f9ae..ce73ade744 100644 --- a/crates/protocol/derive/src/stages/attributes_queue.rs +++ b/crates/protocol/derive/src/stages/attributes_queue.rs @@ -79,6 +79,8 @@ where }; // Construct the payload attributes from the loaded batch. + #[cfg(feature = "metrics")] + let start = std::time::Instant::now(); let attributes = match self.create_next_attributes(batch, parent).await { Ok(attributes) => attributes, Err(e) => { @@ -88,6 +90,11 @@ where let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; let populated_attributes = OpAttributesWithParent::new(attributes, parent, origin, self.is_last_in_span); + kona_macros::record!( + histogram, + crate::metrics::Metrics::PIPELINE_ATTRIBUTES_BUILD_DURATION, + start.elapsed().as_secs_f64() + ); // Clear out the local state once payload attributes are prepared. self.batch = None; diff --git a/crates/protocol/derive/src/stages/batch/batch_stream.rs b/crates/protocol/derive/src/stages/batch/batch_stream.rs index 596f4d8929..db89dd0ff4 100644 --- a/crates/protocol/derive/src/stages/batch/batch_stream.rs +++ b/crates/protocol/derive/src/stages/batch/batch_stream.rs @@ -95,13 +95,11 @@ where })?, ); } + let batch_count = self.buffer.len() as f64; + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count); #[cfg(feature = "metrics")] - { - let batch_count = self.buffer.len() as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count); - let batch_size = std::mem::size_of_val(&self.buffer) as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size); - } + let batch_size = std::mem::size_of_val(&self.buffer) as f64; + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size); Ok(()) } } @@ -129,14 +127,6 @@ where parent: L2BlockInfo, l1_origins: &[BlockInfo], ) -> PipelineResult { - #[cfg(feature = "metrics")] - { - let batch_count = self.buffer.len() as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count); - let batch_size = std::mem::size_of_val(&self.buffer) as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size); - } - // If the stage is not active, "pass" the next batch // through this stage to the BatchQueue stage. if !self.is_active()? { @@ -158,6 +148,8 @@ where match batch_with_inclusion.batch { Batch::Single(b) => return Ok(Batch::Single(b)), Batch::Span(b) => { + #[cfg(feature = "metrics")] + let start = std::time::Instant::now(); let (validity, _) = b .check_batch_prefix( self.config.as_ref(), @@ -167,6 +159,17 @@ where &mut self.fetcher, ) .await; + kona_macros::record!( + histogram, + crate::metrics::Metrics::PIPELINE_CHECK_BATCH_PREFIX, + start.elapsed().as_secs_f64() + ); + + kona_macros::inc!( + gauge, + crate::metrics::Metrics::PIPELINE_BATCH_VALIDITY, + "validity" => validity.to_string(), + ); match validity { BatchValidity::Accept => self.span = Some(b), diff --git a/crates/protocol/derive/src/stages/batch/batch_validator.rs b/crates/protocol/derive/src/stages/batch/batch_validator.rs index b1bc181322..48eba06e3e 100644 --- a/crates/protocol/derive/src/stages/batch/batch_validator.rs +++ b/crates/protocol/derive/src/stages/batch/batch_validator.rs @@ -112,6 +112,23 @@ where // If the origin of the parent block is not included, we must advance the origin. } + #[cfg(feature = "metrics")] + { + if let Some(origin) = self.l1_blocks.first() { + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_L1_BLOCKS_START, + origin.number as f64 + ); + let last = self.l1_blocks.last().unwrap_or(origin); + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_L1_BLOCKS_END, + last.number as f64 + ); + } + } + Ok(()) } diff --git a/crates/protocol/derive/src/stages/channel/channel_reader.rs b/crates/protocol/derive/src/stages/channel/channel_reader.rs index 4199cdc9f7..2121aac6d8 100644 --- a/crates/protocol/derive/src/stages/channel/channel_reader.rs +++ b/crates/protocol/derive/src/stages/channel/channel_reader.rs @@ -72,6 +72,7 @@ where self.next_batch = Some(BatchReader::new(&channel[..], max_rlp_bytes_per_channel as usize)); + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_READER_SET, 1); } Ok(()) } @@ -80,6 +81,7 @@ where /// decoding / decompression state to a fresh start. pub fn next_channel(&mut self) { self.next_batch = None; + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_READER_SET, 0); } } @@ -122,7 +124,14 @@ where .next_batch(self.cfg.as_ref()) .ok_or(PipelineError::NotEnoughData.temp()) { - Ok(batch) => Ok(batch), + Ok(batch) => { + kona_macros::inc!( + gauge, + crate::metrics::Metrics::PIPELINE_READ_BATCHES, + "type" => batch.to_string(), + ); + Ok(batch) + } Err(e) => { self.next_channel(); Err(e) @@ -151,6 +160,7 @@ where // Drop the current in-progress channel. warn!(target: "channel_reader", "Flushed channel"); self.next_batch = None; + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_READER_SET, 0); } s => { self.prev.signal(s).await?; diff --git a/crates/protocol/derive/src/stages/frame_queue.rs b/crates/protocol/derive/src/stages/frame_queue.rs index 3802d30f61..7b4f0980c0 100644 --- a/crates/protocol/derive/src/stages/frame_queue.rs +++ b/crates/protocol/derive/src/stages/frame_queue.rs @@ -109,20 +109,6 @@ where /// Loads more frames into the [FrameQueue]. pub async fn load_frames(&mut self) -> PipelineResult<()> { - // Update metrics both before and after loading frames to ensure - // the gauge is _decremented_ if the queue is empty. - #[cfg(feature = "metrics")] - { - let queue_len = self.queue.len() as f64; - kona_macros::set!( - gauge, - crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_BUFFER, - queue_len - ); - let queue_size = self.queue.iter().map(Frame::size).sum::() as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_MEM, queue_size); - } - // Skip loading frames if the queue is not empty. if !self.queue.is_empty() { return Ok(()); @@ -147,17 +133,14 @@ where // Optimistically extend the queue with the new frames. self.queue.extend(frames); - #[cfg(feature = "metrics")] - { - let queue_len = self.queue.len() as f64; - kona_macros::set!( - gauge, - crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_BUFFER, - queue_len - ); - let queue_size = self.queue.iter().map(Frame::size).sum::() as f64; - kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_MEM, queue_size); - } + // Update metrics with last frame count + kona_macros::set!( + gauge, + crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_BUFFER, + self.queue.len() as f64 + ); + let queue_size = self.queue.iter().map(|f| f.size()).sum::() as f64; + kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_MEM, queue_size); // Prune frames if Holocene is active. let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; diff --git a/crates/protocol/derive/src/types/signals.rs b/crates/protocol/derive/src/types/signals.rs index 9a9b01fc46..a9b47c1e27 100644 --- a/crates/protocol/derive/src/types/signals.rs +++ b/crates/protocol/derive/src/types/signals.rs @@ -19,6 +19,16 @@ pub enum Signal { FlushChannel, } +impl core::fmt::Display for Signal { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Reset(_) => write!(f, "reset"), + Self::Activation(_) => write!(f, "activation"), + Self::FlushChannel => write!(f, "flush_channel"), + } + } +} + impl Signal { /// Sets the [SystemConfig] for the signal. pub const fn with_system_config(self, system_config: SystemConfig) -> Self { diff --git a/crates/protocol/protocol/src/batch/core.rs b/crates/protocol/protocol/src/batch/core.rs index df5ca4c23f..7874b1eb4a 100644 --- a/crates/protocol/protocol/src/batch/core.rs +++ b/crates/protocol/protocol/src/batch/core.rs @@ -17,6 +17,15 @@ pub enum Batch { Span(SpanBatch), } +impl core::fmt::Display for Batch { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Single(_) => write!(f, "single"), + Self::Span(_) => write!(f, "span"), + } + } +} + impl Batch { /// Returns the timestamp for the batch. pub fn timestamp(&self) -> u64 { diff --git a/crates/protocol/protocol/src/batch/validity.rs b/crates/protocol/protocol/src/batch/validity.rs index 1f68d0c52b..cddec2135a 100644 --- a/crates/protocol/protocol/src/batch/validity.rs +++ b/crates/protocol/protocol/src/batch/validity.rs @@ -17,6 +17,18 @@ pub enum BatchValidity { Past, } +impl core::fmt::Display for BatchValidity { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Drop => write!(f, "Drop"), + Self::Accept => write!(f, "Accept"), + Self::Undecided => write!(f, "Undecided"), + Self::Future => write!(f, "Future"), + Self::Past => write!(f, "Past"), + } + } +} + impl BatchValidity { /// Returns whether the batch is accepted. pub const fn is_accept(&self) -> bool { diff --git a/crates/utilities/macros/src/metrics.rs b/crates/utilities/macros/src/metrics.rs index 37603fa6fe..47d9297494 100644 --- a/crates/utilities/macros/src/metrics.rs +++ b/crates/utilities/macros/src/metrics.rs @@ -40,6 +40,10 @@ macro_rules! inc { #[cfg(feature = "metrics")] metrics::$instrument!($metric $(, $label_key $(=> $label_value)?)*).increment(1); }; + ($instrument:ident, $metric:path, $value:expr $(, $label_key:expr $(=> $label_value:expr)?)*$(,)?) => { + #[cfg(feature = "metrics")] + metrics::$instrument!($metric $(, $label_key $(=> $label_value)?)*).increment($value); + }; } /// Records a value, optionally with a specified label.