Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions crates/protocol/derive/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
/// Identifier for the pipeline origin gauge.
pub const PIPELINE_ORIGIN: &str = "kona_derive_pipeline_origin";

/// Identifier for the latest l2 block the pipeline stepped on.
pub const PIPELINE_STEP_BLOCK: &str = "kona_derive_pipeline_step_block";

/// Identifier for if the batch reader is set.
pub const PIPELINE_BATCH_READER_SET: &str = "kona_derive_batch_reader_set";

/// Identifier to track the amount of time it takes to advance the pipeline origin.
pub const PIPELINE_ORIGIN_ADVANCE: &str = "kona_derive_pipeline_origin_advance";

Expand Down Expand Up @@ -37,10 +43,52 @@

/// Identifier for the batch stream stage batch memory overhead gauge.
pub const PIPELINE_BATCH_MEM: &str = "kona_derive_batch_mem";

/// Identifier for the size of batches read by the channel reader.
pub const PIPELINE_READ_BATCHES: &str = "kona_derive_read_batches";

/// Identifier for the gauge that tracks the number of pipeline steps.
pub const PIPELINE_STEPS: &str = "kona_derive_pipeline_steps";

/// Identifier for the gauge that tracks the number of prepared attributes.
pub const PIPELINE_PREPARED_ATTRIBUTES: &str = "kona_derive_prepared_attributes";

/// Identifier tracking the number of pipeline signals.
pub const PIPELINE_SIGNALS: &str = "kona_derive_pipeline_signals";

/// Identifier that tracks the batch validator l1 blocks start.
pub const PIPELINE_L1_BLOCKS_START: &str = "kona_derive_l1_blocks_start";

/// Identifier that tracks the batch validator l1 blocks end.
pub const PIPELINE_L1_BLOCKS_END: &str = "kona_derive_l1_blocks_end";

/// Identifier to track the size of the current derived span batch.
pub const PIPELINE_DERIVED_SPAN_SIZE: &str = "kona_derive_span_size";

/// Identifier to track the number of transactions in the latest derived payload attributes.
pub const PIPELINE_LATEST_PAYLOAD_TX_COUNT: &str = "kona_derive_payload_tx_count";

/// Identifier for the data availability provider data.
pub const PIPELINE_DATA_AVAILABILITY_PROVIDER: &str = "kona_derive_dap_sources";

/// Identifier for a gauge that tracks batch validity.
pub const PIPELINE_BATCH_VALIDITY: &str = "kona_derive_batch_validity";

/// Identifier for the histogram that tracks the amount of time it takes to validate a
/// span batch.
pub const PIPELINE_CHECK_BATCH_PREFIX: &str = "kona_derive_check_batch_prefix_duration";

/// Identifier for the histogram that tracks the amount of time it takes to build payload
/// attributes.
pub const PIPELINE_ATTRIBUTES_BUILD_DURATION: &str = "kona_derive_attributes_build_duration";

/// Identifier for the gauge that tracks the number of payload attributes buffered in the
/// pipeline.
pub const PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER: &str = "kona_derive_payload_attributes_buffer";
}

impl Metrics {
/// Initializes metrics for the P2P stack.
/// Initializes metrics.
///
/// This does two things:
/// * Describes various metrics.
Expand All @@ -51,13 +99,41 @@
Self::zero();
}

/// Describes metrics used in [`kona_p2p`][crate].
/// Describes metrics.
#[cfg(feature = "metrics")]
pub fn describe() {
metrics::describe_gauge!(
Self::PIPELINE_ORIGIN,
"The block height of the pipeline l1 origin"
);
metrics::describe_gauge!(
Self::PIPELINE_BATCH_VALIDITY,
"The validity of the batch being processed",
);
metrics::describe_gauge!(
Self::PIPELINE_DATA_AVAILABILITY_PROVIDER,
"The source of pipeline data"
);
metrics::describe_gauge!(
Self::PIPELINE_DERIVED_SPAN_SIZE,
"The number of payload attributes in the current span"
);
metrics::describe_gauge!(
Self::PIPELINE_LATEST_PAYLOAD_TX_COUNT,
"The number of transactions in the latest derived payload attributes"
);
metrics::describe_gauge!(Self::PIPELINE_READ_BATCHES, "The read batches");
metrics::describe_gauge!(Self::PIPELINE_BATCH_READER_SET, "If the batch reader is set");
metrics::describe_gauge!(Self::PIPELINE_L1_BLOCKS_START, "Earliest l1 blocks height");
metrics::describe_gauge!(Self::PIPELINE_L1_BLOCKS_END, "Latest l1 blocks height");
metrics::describe_gauge!(
Self::PIPELINE_STEP_BLOCK,
"The latest L2 block height that the pipeline stepped on"
);
metrics::describe_histogram!(
Self::PIPELINE_CHECK_BATCH_PREFIX,
"The time it takes to validate a span batch"
);

Check warning on line 136 in crates/protocol/derive/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/protocol/derive/src/metrics/mod.rs#L109-L136

Added lines #L109 - L136 were not covered by tests
metrics::describe_histogram!(
Self::PIPELINE_ORIGIN_ADVANCE,
"The amount of time it takes to advance the pipeline origin"
Expand Down Expand Up @@ -98,13 +174,61 @@
Self::PIPELINE_BATCH_MEM,
"The memory size of batches held in the batch stream stage"
);
metrics::describe_gauge!(
Self::PIPELINE_STEPS,
"The total number of pipeline steps on the derivation pipeline"
);
metrics::describe_gauge!(
Self::PIPELINE_PREPARED_ATTRIBUTES,
"The total number of prepared attributes generated by the derivation pipeline"
);
metrics::describe_gauge!(
Self::PIPELINE_SIGNALS,
"Number of times the pipeline has been signalled"
);
metrics::describe_histogram!(
Self::PIPELINE_ATTRIBUTES_BUILD_DURATION,
"The time it takes to build payload attributes"
);
metrics::describe_gauge!(
Self::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER,
"The number of payload attributes buffered in the pipeline"
);

Check warning on line 196 in crates/protocol/derive/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/protocol/derive/src/metrics/mod.rs#L177-L196

Added lines #L177 - L196 were not covered by tests
}

/// Initializes metrics to 0 so they can be queried immediately.
#[cfg(feature = "metrics")]
pub fn zero() {
kona_macros::set!(gauge, Self::PIPELINE_FRAME_QUEUE_BUFFER, 0);
// The batch reader is by default not set.
kona_macros::set!(gauge, Self::PIPELINE_BATCH_READER_SET, 0);

// No source data is initially read.
kona_macros::set!(gauge, Self::PIPELINE_DATA_AVAILABILITY_PROVIDER, "source", "blobs", 0);
kona_macros::set!(
gauge,
Self::PIPELINE_DATA_AVAILABILITY_PROVIDER,
"source",
"calldata",
0
);

// Pipeline signals start at zero.
kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "reset", 0);
kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "activation", 0);
kona_macros::set!(gauge, Self::PIPELINE_SIGNALS, "type", "flush_channel", 0);

// No batches are initially read.
kona_macros::set!(gauge, Self::PIPELINE_READ_BATCHES, "type", "single", 0);
kona_macros::set!(gauge, Self::PIPELINE_READ_BATCHES, "type", "span", 0);

// Cumulative counters start at zero.
kona_macros::set!(gauge, Self::PIPELINE_STEPS, 0);
kona_macros::set!(gauge, Self::PIPELINE_PREPARED_ATTRIBUTES, 0);

// All buffers can be zeroed out since they are expected to return to zero.
kona_macros::set!(gauge, Self::PIPELINE_BATCH_BUFFER, 0);

Check warning on line 229 in crates/protocol/derive/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/protocol/derive/src/metrics/mod.rs#L202-L229

Added lines #L202 - L229 were not covered by tests
kona_macros::set!(gauge, Self::PIPELINE_CHANNEL_BUFFER, 0);
kona_macros::set!(gauge, Self::PIPELINE_CHANNEL_TIMEOUT, 0);
kona_macros::set!(gauge, Self::PIPELINE_FRAME_QUEUE_BUFFER, 0);
kona_macros::set!(gauge, Self::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER, 0);

Check warning on line 232 in crates/protocol/derive/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/protocol/derive/src/metrics/mod.rs#L231-L232

Added lines #L231 - L232 were not covered by tests
}
}
35 changes: 35 additions & 0 deletions crates/protocol/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ where
type Item = OpAttributesWithParent;

fn next(&mut self) -> Option<Self::Item> {
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER,
self.prepared.len().saturating_sub(1) as f64
);
self.prepared.pop_front()
}
}
Expand Down Expand Up @@ -117,6 +122,11 @@ where
self.attributes.signal(signal).await?;
}
}
kona_macros::inc!(
gauge,
crate::metrics::Metrics::PIPELINE_SIGNALS,
"type" => signal.to_string(),
);
Ok(())
}
}
Expand Down Expand Up @@ -161,10 +171,35 @@ where
///
/// [PipelineError]: crate::errors::PipelineError
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult {
kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_STEPS);
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_STEP_BLOCK,
cursor.block_info.number as f64
);
match self.attributes.next_attributes(cursor).await {
Ok(a) => {
trace!(target: "pipeline", "Prepared L2 attributes: {:?}", a);
kona_macros::inc!(
gauge,
crate::metrics::Metrics::PIPELINE_PAYLOAD_ATTRIBUTES_BUFFER
);
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_LATEST_PAYLOAD_TX_COUNT,
a.inner.transactions.as_ref().map_or(0.0, |txs| txs.len() as f64)
);
if !a.is_last_in_span {
kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_DERIVED_SPAN_SIZE);
} else {
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_DERIVED_SPAN_SIZE,
0
);
}
self.prepared.push_back(a);
kona_macros::inc!(gauge, crate::metrics::Metrics::PIPELINE_PREPARED_ATTRIBUTES);
StepResult::PreparedAttributes
}
Err(err) => match err {
Expand Down
6 changes: 6 additions & 0 deletions crates/protocol/derive/src/sources/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ where
index += 1;
}
}
#[cfg(feature = "metrics")]
metrics::gauge!(
crate::metrics::Metrics::PIPELINE_DATA_AVAILABILITY_PROVIDER,
"source" => "blobs",
)
.increment(data.len() as f64);
(data, hashes)
}

Expand Down
7 changes: 7 additions & 0 deletions crates/protocol/derive/src/sources/calldata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ impl<CP: ChainProvider + Send> CalldataSource<CP> {
})
.collect::<VecDeque<_>>();

#[cfg(feature = "metrics")]
metrics::gauge!(
crate::metrics::Metrics::PIPELINE_DATA_AVAILABILITY_PROVIDER,
"source" => "calldata",
)
.increment(self.calldata.len() as f64);

self.open = true;

Ok(())
Expand Down
7 changes: 7 additions & 0 deletions crates/protocol/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ where
};

// Construct the payload attributes from the loaded batch.
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let attributes = match self.create_next_attributes(batch, parent).await {
Ok(attributes) => attributes,
Err(e) => {
Expand All @@ -88,6 +90,11 @@ where
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let populated_attributes =
OpAttributesWithParent::new(attributes, parent, origin, self.is_last_in_span);
kona_macros::record!(
histogram,
crate::metrics::Metrics::PIPELINE_ATTRIBUTES_BUILD_DURATION,
start.elapsed().as_secs_f64()
);

// Clear out the local state once payload attributes are prepared.
self.batch = None;
Expand Down
31 changes: 17 additions & 14 deletions crates/protocol/derive/src/stages/batch/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,11 @@ where
})?,
);
}
let batch_count = self.buffer.len() as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count);
#[cfg(feature = "metrics")]
{
let batch_count = self.buffer.len() as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count);
let batch_size = std::mem::size_of_val(&self.buffer) as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size);
}
let batch_size = std::mem::size_of_val(&self.buffer) as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size);
Ok(())
}
}
Expand Down Expand Up @@ -129,14 +127,6 @@ where
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch> {
#[cfg(feature = "metrics")]
{
let batch_count = self.buffer.len() as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count);
let batch_size = std::mem::size_of_val(&self.buffer) as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size);
}

// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
if !self.is_active()? {
Expand All @@ -158,6 +148,8 @@ where
match batch_with_inclusion.batch {
Batch::Single(b) => return Ok(Batch::Single(b)),
Batch::Span(b) => {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let (validity, _) = b
.check_batch_prefix(
self.config.as_ref(),
Expand All @@ -167,6 +159,17 @@ where
&mut self.fetcher,
)
.await;
kona_macros::record!(
histogram,
crate::metrics::Metrics::PIPELINE_CHECK_BATCH_PREFIX,
start.elapsed().as_secs_f64()
);

kona_macros::inc!(
gauge,
crate::metrics::Metrics::PIPELINE_BATCH_VALIDITY,
"validity" => validity.to_string(),
);

match validity {
BatchValidity::Accept => self.span = Some(b),
Expand Down
17 changes: 17 additions & 0 deletions crates/protocol/derive/src/stages/batch/batch_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ where
// If the origin of the parent block is not included, we must advance the origin.
}

#[cfg(feature = "metrics")]
{
if let Some(origin) = self.l1_blocks.first() {
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_L1_BLOCKS_START,
origin.number as f64
);
let last = self.l1_blocks.last().unwrap_or(origin);
kona_macros::set!(
gauge,
crate::metrics::Metrics::PIPELINE_L1_BLOCKS_END,
last.number as f64
);
}
}

Ok(())
}

Expand Down
Loading
Loading