diff --git a/Cargo.lock b/Cargo.lock index 1623cc1d3..c160bac23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7076,6 +7076,7 @@ dependencies = [ "tikv-jemallocator", "time", "tokio", + "tokio-metrics", "tokio-tungstenite", "tokio-util", "tower", @@ -13152,6 +13153,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "tokio-metrics" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c2ca6283a34abc77cb58ea83d4a3c40e0162b0095d7a674172a2eed5415197" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 8c7aed896..6cb0dbfb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -205,6 +205,7 @@ lazy_static = "1.4.0" tikv-jemallocator = { version = "0.6" } tracing = "0.1.37" metrics = { version = "0.24.1" } +tokio-metrics = { version = "0.4.7" } ahash = "0.8.6" time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } vergen = "9.0.4" diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 6008f33f1..ed2c0d57c 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -106,6 +106,7 @@ either.workspace = true metrics.workspace = true serde_json.workspace = true tokio-util.workspace = true +tokio-metrics.workspace = true thiserror.workspace = true parking_lot.workspace = true url.workspace = true diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e8327aa33..54ad149d8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -10,6 +10,7 @@ use crate::{ gas_limiter::AddressGasLimiter, metrics::OpRBuilderMetrics, primitives::reth::ExecutionInfo, + tokio_metrics::FlashblocksTaskMetrics, traits::{ClientBounds, PoolBounds}, }; use alloy_consensus::{ @@ -187,6 +188,8 @@ pub(super) struct OpPayloadBuilder { pub builder_tx: BuilderTx, /// Rate limiting based on gas. This is an optional feature. pub address_gas_limiter: AddressGasLimiter, + /// Tokio task metrics for monitoring spawned tasks + pub task_metrics: Arc, } impl OpPayloadBuilder { @@ -201,6 +204,7 @@ impl OpPayloadBuilder { payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, + task_metrics: Arc, ) -> Self { let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone()); Self { @@ -213,6 +217,7 @@ impl OpPayloadBuilder { metrics, builder_tx, address_gas_limiter, + task_metrics, } } } @@ -525,8 +530,10 @@ where std::sync::mpsc::sync_channel((self.config.flashblocks_per_block() + 1) as usize); let build_at_interval_end = self.config.specific.build_at_interval_end; - tokio::spawn({ + tokio::spawn(self.task_metrics.flashblock_timer.instrument({ let block_cancel = block_cancel.clone(); + let flashblock_index = ctx.flashblock_index(); + let block_number = ctx.block_number(); async move { // If NOT building at interval end, send immediate signal to build first @@ -555,6 +562,13 @@ where loop { tokio::select! { _ = timer.tick() => { + debug!( + target: "payload_builder", + payload_id = ?fb_payload.payload_id, + flashblock_index = flashblock_index, + block_number = block_number, + "Triggering next flashblock with timer", + ); // cancel current payload building job fb_cancel.cancel(); fb_cancel = block_cancel.child_token(); @@ -590,7 +604,7 @@ where } } } - }); + })); // Process flashblocks - block on async channel receive loop { @@ -598,6 +612,13 @@ where // If build_at_interval_end is false, an immediate signal is sent so we don't wait. // If build_at_interval_end is true, we wait for the timer tick (first_flashblock_offset). if let Ok(new_fb_cancel) = rx.recv() { + debug!( + target: "payload_builder", + payload_id = ?fb_payload.payload_id, + flashblock_index = ctx.flashblock_index(), + block_number = ctx.block_number(), + "Received signal to build flashblock", + ); ctx = ctx.with_cancel(new_fb_cancel); } else { // Channel closed - block building cancelled diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index b0563421a..32a366596 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -14,6 +14,7 @@ use crate::{ }, flashtestations::service::bootstrap_flashtestations, metrics::OpRBuilderMetrics, + tokio_metrics::FlashblocksTaskMetrics, traits::{NodeBounds, PoolBounds}, }; use eyre::WrapErr as _; @@ -23,7 +24,7 @@ use reth_node_builder::{BuilderContext, components::PayloadServiceBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::CanonStateSubscriptions; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; pub struct FlashblocksServiceBuilder(pub BuilderConfig); @@ -106,12 +107,16 @@ impl FlashblocksServiceBuilder { }; let metrics = Arc::new(OpRBuilderMetrics::default()); + let task_metrics = Arc::new(FlashblocksTaskMetrics::new()); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); - let ws_pub: Arc = - WebSocketPublisher::new(self.0.specific.ws_addr, metrics.clone()) - .wrap_err("failed to create ws publisher")? - .into(); + let ws_pub: Arc = WebSocketPublisher::new( + self.0.specific.ws_addr, + metrics.clone(), + &task_metrics.websocket_publisher, + ) + .wrap_err("failed to create ws publisher")? + .into(); let payload_builder = OpPayloadBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), pool, @@ -121,6 +126,7 @@ impl FlashblocksServiceBuilder { built_payload_tx, ws_pub.clone(), metrics.clone(), + task_metrics.clone(), ); let payload_job_config = BasicPayloadJobGeneratorConfig::default(); @@ -154,13 +160,28 @@ impl FlashblocksServiceBuilder { cancel, ); - ctx.task_executor() - .spawn_critical("custom payload builder service", Box::pin(payload_service)); + ctx.task_executor().spawn_critical( + "custom payload builder service", + Box::pin( + task_metrics + .payload_builder_service + .instrument(payload_service), + ), + ); ctx.task_executor().spawn_critical( "flashblocks payload handler", - Box::pin(payload_handler.run()), + Box::pin( + task_metrics + .payload_handler + .instrument(payload_handler.run()), + ), ); + // Spawn the tokio metrics collector (records metrics every second) + task_metrics + .clone() + .spawn_metrics_collector(Duration::from_secs(1)); + tracing::info!("Flashblocks payload builder service started"); Ok(payload_builder_handle) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index acdbf0c28..c733aa58d 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -20,7 +20,7 @@ use tokio_tungstenite::{ }; use tracing::{debug, warn}; -use crate::metrics::OpRBuilderMetrics; +use crate::{metrics::OpRBuilderMetrics, tokio_metrics::MonitoredTask}; /// A WebSockets publisher that accepts connections from client websockets and broadcasts to them /// updates about new flashblocks. It maintains a count of sent messages and active subscriptions. @@ -34,7 +34,11 @@ pub(super) struct WebSocketPublisher { } impl WebSocketPublisher { - pub(super) fn new(addr: SocketAddr, metrics: Arc) -> io::Result { + pub(super) fn new( + addr: SocketAddr, + metrics: Arc, + task_monitor: &MonitoredTask, + ) -> io::Result { let (pipe, _) = broadcast::channel(100); let (term, _) = watch::channel(false); @@ -42,14 +46,14 @@ impl WebSocketPublisher { let subs = Arc::new(AtomicUsize::new(0)); let listener = TcpListener::bind(addr)?; - tokio::spawn(listener_loop( + tokio::spawn(task_monitor.instrument(listener_loop( listener, metrics, pipe.subscribe(), term.subscribe(), Arc::clone(&sent), Arc::clone(&subs), - )); + ))); Ok(Self { sent, diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index 7817ba2d0..1ede126e2 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -7,6 +7,7 @@ pub mod metrics; mod monitor_tx_pool; pub mod primitives; pub mod revert_protection; +pub mod tokio_metrics; pub mod traits; pub mod tx; pub mod tx_signer; diff --git a/crates/op-rbuilder/src/tokio_metrics.rs b/crates/op-rbuilder/src/tokio_metrics.rs new file mode 100644 index 000000000..81d4d4492 --- /dev/null +++ b/crates/op-rbuilder/src/tokio_metrics.rs @@ -0,0 +1,282 @@ +//! Tokio runtime metrics instrumentation using tokio-metrics crate. +//! +//! This module provides task-level metrics for monitoring spawned tokio tasks +//! in the flashblocks builder, including poll times, idle times, and scheduling delays. +//! It also provides global runtime metrics for monitoring the tokio runtime itself. + +use reth_metrics::{ + Metrics, + metrics::{Counter, Gauge, Histogram}, +}; +use std::{fmt, future::Future, sync::Arc, time::Duration}; +use tokio_metrics::{RuntimeMetrics, RuntimeMonitor, TaskMetrics, TaskMonitor}; + +/// Metrics for a single monitored tokio task. +#[derive(Metrics, Clone)] +#[metrics(scope = "op_rbuilder.tokio_task")] +pub struct TokioTaskMetricsRecorder { + /// Total number of times the task has been instrumented (spawned) + pub instrumented_count: Counter, + /// Total number of times the task was dropped + pub dropped_count: Counter, + /// Number of tasks currently being polled + pub first_poll_count: Counter, + /// Total poll count across all intervals + pub total_poll_count: Counter, + /// Total time spent polling (seconds) + pub total_poll_duration_seconds: Histogram, + /// Mean poll duration per interval (microseconds) + pub mean_poll_duration_us: Histogram, + /// Total time spent idle (seconds) + pub total_idle_duration_seconds: Histogram, + /// Mean idle duration per interval (microseconds) + pub mean_idle_duration_us: Histogram, + /// Total time spent waiting to be scheduled (seconds) + pub total_scheduled_duration_seconds: Histogram, + /// Mean scheduled wait duration per interval (microseconds) + pub mean_scheduled_duration_us: Histogram, + /// Number of times task exceeded slow poll threshold + pub slow_poll_count: Counter, + /// Total duration of slow polls (seconds) + pub slow_poll_duration_seconds: Histogram, + /// Number of times task was scheduled for short duration + pub short_delay_count: Counter, + /// Number of times task was scheduled for long duration + pub long_delay_count: Counter, + /// Total duration of long scheduling delays (seconds) + pub long_delay_duration_seconds: Histogram, +} + +/// Metrics for the global tokio runtime. +/// +/// Note: Only stable tokio metrics are exposed here. Additional metrics like +/// steal counts, schedule counts, and overflow counts require the `tokio_unstable` flag. +#[derive(Metrics, Clone)] +#[metrics(scope = "op_rbuilder.tokio_runtime")] +pub struct TokioRuntimeMetricsRecorder { + /// Number of worker threads in the runtime + pub workers_count: Gauge, + /// Current number of alive tasks in the runtime + pub live_tasks_count: Gauge, + /// Total number of times worker threads parked + pub total_park_count: Counter, + /// Max park count across all workers in the interval + pub max_park_count: Histogram, + /// Min park count across all workers in the interval + pub min_park_count: Histogram, + /// Total time workers spent busy executing tasks (seconds) + pub total_busy_duration_seconds: Histogram, + /// Max busy duration across all workers (seconds) + pub max_busy_duration_seconds: Histogram, + /// Min busy duration across all workers (seconds) + pub min_busy_duration_seconds: Histogram, + /// Depth of the global queue + pub global_queue_depth: Gauge, + /// Elapsed time since the runtime started (for rate calculations) + pub elapsed_seconds: Histogram, +} + +impl TokioRuntimeMetricsRecorder { + /// Record metrics from a RuntimeMetrics snapshot. + pub fn record(&self, metrics: &RuntimeMetrics) { + self.workers_count.set(metrics.workers_count as f64); + self.live_tasks_count.set(metrics.live_tasks_count as f64); + self.total_park_count.increment(metrics.total_park_count); + self.max_park_count.record(metrics.max_park_count as f64); + self.min_park_count.record(metrics.min_park_count as f64); + self.total_busy_duration_seconds + .record(metrics.total_busy_duration.as_secs_f64()); + self.max_busy_duration_seconds + .record(metrics.max_busy_duration.as_secs_f64()); + self.min_busy_duration_seconds + .record(metrics.min_busy_duration.as_secs_f64()); + self.global_queue_depth + .set(metrics.global_queue_depth as f64); + self.elapsed_seconds.record(metrics.elapsed.as_secs_f64()); + } +} + +/// A wrapper around tokio_metrics::TaskMonitor that records metrics. +#[derive(Clone)] +pub struct MonitoredTask { + monitor: TaskMonitor, + recorder: TokioTaskMetricsRecorder, + task_name: &'static str, +} + +impl MonitoredTask { + /// Create a new monitored task with the given name. + pub fn new(task_name: &'static str) -> Self { + Self { + monitor: TaskMonitor::new(), + recorder: TokioTaskMetricsRecorder::new_with_labels(&[("task", task_name)]), + task_name, + } + } + + /// Instrument a future to be monitored by this task monitor. + pub fn instrument(&self, future: F) -> tokio_metrics::Instrumented { + self.monitor.instrument(future) + } + + pub fn monitor(&self) -> &TaskMonitor { + &self.monitor + } + + pub fn task_name(&self) -> &'static str { + self.task_name + } + + /// Record metrics from a TaskMetrics snapshot. + pub fn record_metrics(&self, metrics: &TaskMetrics) { + self.recorder + .instrumented_count + .increment(metrics.instrumented_count); + self.recorder.dropped_count.increment(metrics.dropped_count); + self.recorder + .first_poll_count + .increment(metrics.first_poll_count); + self.recorder + .total_poll_count + .increment(metrics.total_poll_count); + + self.recorder + .total_poll_duration_seconds + .record(metrics.total_poll_duration.as_secs_f64()); + + if metrics.total_poll_count > 0 { + let mean_poll_us = metrics.mean_poll_duration().as_micros() as f64; + self.recorder.mean_poll_duration_us.record(mean_poll_us); + } + + self.recorder + .total_idle_duration_seconds + .record(metrics.total_idle_duration.as_secs_f64()); + + if metrics.total_idled_count > 0 { + let mean_idle_us = metrics.mean_idle_duration().as_micros() as f64; + self.recorder.mean_idle_duration_us.record(mean_idle_us); + } + + self.recorder + .total_scheduled_duration_seconds + .record(metrics.total_scheduled_duration.as_secs_f64()); + + if metrics.total_scheduled_count > 0 { + let mean_scheduled_us = metrics.mean_scheduled_duration().as_micros() as f64; + self.recorder + .mean_scheduled_duration_us + .record(mean_scheduled_us); + } + + self.recorder + .slow_poll_count + .increment(metrics.total_slow_poll_count); + self.recorder + .slow_poll_duration_seconds + .record(metrics.total_slow_poll_duration.as_secs_f64()); + + self.recorder + .short_delay_count + .increment(metrics.total_short_delay_count); + self.recorder + .long_delay_count + .increment(metrics.total_long_delay_count); + self.recorder + .long_delay_duration_seconds + .record(metrics.total_long_delay_duration.as_secs_f64()); + } +} + +/// Collection of task monitors +#[derive(Clone)] +pub struct FlashblocksTaskMetrics { + /// Monitor for the flashblock timer task + pub flashblock_timer: MonitoredTask, + /// Monitor for the payload builder service + pub payload_builder_service: MonitoredTask, + /// Monitor for the payload handler + pub payload_handler: MonitoredTask, + /// Monitor for the websocket listener task + pub websocket_publisher: MonitoredTask, + /// Global runtime metrics recorder + runtime_recorder: TokioRuntimeMetricsRecorder, +} + +impl Default for FlashblocksTaskMetrics { + fn default() -> Self { + Self::new() + } +} + +impl FlashblocksTaskMetrics { + pub fn new() -> Self { + Self { + flashblock_timer: MonitoredTask::new("flashblock_timer"), + payload_builder_service: MonitoredTask::new("payload_builder_service"), + payload_handler: MonitoredTask::new("payload_handler"), + websocket_publisher: MonitoredTask::new("websocket_publisher"), + runtime_recorder: TokioRuntimeMetricsRecorder::default(), + } + } + + /// Spawn a background task that periodically records metrics from all monitors. + /// + /// This should be called once at startup to begin metric collection. + pub fn spawn_metrics_collector(self: Arc, interval: Duration) { + let metrics = self; + tokio::spawn(async move { + let mut timer = tokio::time::interval(interval); + + // Get runtime monitor for the current tokio runtime + let runtime_monitor = RuntimeMonitor::new(&tokio::runtime::Handle::current()); + let mut runtime_intervals = runtime_monitor.intervals(); + + // Get interval iterators for each task monitor + let mut flashblock_timer_intervals = metrics.flashblock_timer.monitor.intervals(); + let mut payload_builder_intervals = metrics.payload_builder_service.monitor.intervals(); + let mut payload_handler_intervals = metrics.payload_handler.monitor.intervals(); + let mut websocket_publisher_intervals = metrics.websocket_publisher.monitor.intervals(); + + loop { + timer.tick().await; + + // Record global runtime metrics + if let Some(runtime_metrics) = runtime_intervals.next() { + metrics.runtime_recorder.record(&runtime_metrics); + } + + // Record metrics for each task + if let Some(task_metrics) = flashblock_timer_intervals.next() { + metrics.flashblock_timer.record_metrics(&task_metrics); + } + if let Some(task_metrics) = payload_builder_intervals.next() { + metrics + .payload_builder_service + .record_metrics(&task_metrics); + } + if let Some(task_metrics) = payload_handler_intervals.next() { + metrics.payload_handler.record_metrics(&task_metrics); + } + if let Some(task_metrics) = websocket_publisher_intervals.next() { + metrics.websocket_publisher.record_metrics(&task_metrics); + } + } + }); + } +} + +impl fmt::Debug for FlashblocksTaskMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlashblocksTaskMetrics") + .field("flashblock_timer", &self.flashblock_timer.task_name()) + .field( + "payload_builder_service", + &self.payload_builder_service.task_name(), + ) + .field("payload_handler", &self.payload_handler.task_name()) + .field("websocket_publisher", &self.websocket_publisher.task_name()) + .field("runtime_monitor", &"enabled") + .finish() + } +}