Skip to content

Commit

Permalink
feat(derive): track the current channel size
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Jun 28, 2024
1 parent 8dee08c commit f1676a1
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 0 deletions.
4 changes: 4 additions & 0 deletions crates/derive/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ macro_rules! timer {
/// Increments a metric with a label value.
#[macro_export]
macro_rules! inc {
($metric:ident) => {
#[cfg(feature = "metrics")]
$crate::metrics::$metric.inc();
};
($metric:ident, $labels:expr) => {
#[cfg(feature = "metrics")]
$crate::metrics::$metric.with_label_values($labels).inc();
Expand Down
6 changes: 6 additions & 0 deletions crates/derive/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ lazy_static! {
"Tracks the L1 origin for the L1 Traversal Stage"
).expect("Origin Gauge failed to register");

/// Tracks the number of frames in the current channel.
pub static ref CURRENT_CHANNEL_FRAMES: IntGauge = register_int_gauge!(
"kona_derive_current_channel_frames",
"Tracks the number of frames in the current channel."
).expect("Current channel frames failed to register");

/// Tracks batch reader errors.
pub static ref BATCH_READER_ERRORS: CounterVec = register_counter_vec!(
"kona_derive_batch_reader_errors",
Expand Down
9 changes: 9 additions & 0 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ where
warn!(target: "channel-bank", "Failed to add frame to channel: {:?}", frame_id);
return Ok(());
}
crate::inc!(CURRENT_CHANNEL_FRAMES);

self.prune()
}
Expand All @@ -122,6 +123,14 @@ where
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - channel.open_block_number()) as f64);
self.channels.remove(&first);
self.channel_queue.pop_front();
crate::set!(
CURRENT_CHANNEL_FRAMES,
self.channel_queue.front().map_or(0, |id| self
.channels
.get(id)
.map_or(0, |c| c.len())
as i64)
);
return Ok(None);
}

Expand Down
10 changes: 10 additions & 0 deletions crates/derive/src/types/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ impl Channel {
Self { id, open_block, inputs: HashMap::new(), ..Default::default() }
}

/// Returns the number of frames ingested.
pub fn len(&self) -> usize {
self.inputs.len()
}

/// Returns if the channel is empty.
pub fn is_empty(&self) -> bool {
self.inputs.is_empty()
}

/// Add a frame to the channel.
///
/// ## Takes
Expand Down

0 comments on commit f1676a1

Please sign in to comment.