diff --git a/Cargo.lock b/Cargo.lock index d9dc6c20d..ef4e89ad8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5702,6 +5702,7 @@ dependencies = [ "op-alloy-rpc-types", "op-alloy-rpc-types-engine", "op-revm", + "opentelemetry 0.29.1", "parking_lot", "rand 0.9.1", "reth", diff --git a/Cargo.toml b/Cargo.toml index 9106e0916..35e489d71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -184,3 +184,4 @@ ahash = "0.8.6" time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } vergen = "9.0.4" vergen-git2 = "1.0.5" +opentelemetry = { version = "0.29.1", features = ["trace"] } diff --git a/Makefile b/Makefile index 9f275770a..bcaad47d0 100644 --- a/Makefile +++ b/Makefile @@ -44,8 +44,7 @@ docker-image-rbuilder: ## Build a rbuilder Docker image .PHONY: lint lint: ## Run the linters cargo +nightly fmt -- --check - cargo +nightly clippy --features "$(FEATURES)" -- -D warnings - cargo +nightly clippy -p op-rbuilder --features "$(FEATURES)" -- -D warnings + cargo +nightly clippy --all-features -- -D warnings .PHONY: test test: ## Run the tests for rbuilder and op-rbuilder @@ -58,8 +57,7 @@ lt: lint test ## Run "lint" and "test" .PHONY: fmt fmt: ## Format the code cargo +nightly fmt - cargo +nightly clippy --features "$(FEATURES)" --fix --allow-staged --allow-dirty - cargo +nightly clippy -p op-rbuilder --features "$(FEATURES)" --fix --allow-staged --allow-dirty + cargo +nightly clippy --all-features --fix --allow-staged --allow-dirty cargo +nightly fix --allow-staged --allow-dirty .PHONY: bench diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 5e8287065..5e4173524 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -99,6 +99,7 @@ thiserror.workspace = true parking_lot.workspace = true url.workspace = true anyhow = "1" +opentelemetry = { workspace = true, optional = true } tower = "0.5" futures = "0.3" @@ -177,7 +178,10 @@ testing = [ interop = [] -telemetry = ["reth-tracing-otlp"] +telemetry = [ + "reth-tracing-otlp", + "opentelemetry", +] custom-engine-api = [] diff --git a/crates/op-rbuilder/src/args/mod.rs b/crates/op-rbuilder/src/args/mod.rs index 9048fde70..0c0b1a4e1 100644 --- a/crates/op-rbuilder/src/args/mod.rs +++ b/crates/op-rbuilder/src/args/mod.rs @@ -3,7 +3,7 @@ use crate::{ metrics::{CARGO_PKG_VERSION, VERGEN_GIT_SHA}, }; use clap_builder::{CommandFactory, FromArgMatches}; -pub use op::{FlashblocksArgs, OpRbuilderArgs}; +pub use op::{FlashblocksArgs, OpRbuilderArgs, TelemetryArgs}; use playground::PlaygroundOptions; use reth_optimism_cli::{chainspec::OpChainSpecParser, commands::Commands}; diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 4f54adaa5..4cd9095a1 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -52,6 +52,8 @@ pub struct OpRbuilderArgs { pub playground: Option, #[command(flatten)] pub flashblocks: FlashblocksArgs, + #[command(flatten)] + pub telemetry: TelemetryArgs, } impl Default for OpRbuilderArgs { @@ -124,3 +126,15 @@ impl Default for FlashblocksArgs { node_command.ext.flashblocks } } + +/// Parameters for telemetry configuration +#[derive(Debug, Clone, Default, PartialEq, Eq, clap::Args)] +pub struct TelemetryArgs { + /// OpenTelemetry endpoint for traces + #[arg(long = "telemetry.otlp-endpoint", env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + pub otlp_endpoint: Option, + + /// OpenTelemetry headers for authentication + #[arg(long = "telemetry.otlp-headers", env = "OTEL_EXPORTER_OTLP_HEADERS")] + pub otlp_headers: Option, +} diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 218596ec8..99e85b35b 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -334,7 +334,13 @@ impl OpPayloadBuilderCtx { let tx_da_limit = self.da_config.max_da_tx_size(); let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone()); - info!(target: "payload_builder", block_da_limit = ?block_da_limit, tx_da_size = ?tx_da_limit, block_gas_limit = ?block_gas_limit, "DA limits"); + info!( + target: "payload_builder", + message = "Executing best transactions", + block_da_limit = ?block_da_limit, + tx_da_limit = ?tx_da_limit, + block_gas_limit = ?block_gas_limit, + ); // Remove once we merge Reth 1.4.4 // Fixed in https://github.com/paradigmxyz/reth/pull/16514 @@ -367,9 +373,18 @@ impl OpPayloadBuilderCtx { reverted_hashes.is_some() && !reverted_hashes.unwrap().contains(&tx_hash); let log_txn = |result: TxnExecutionResult| { - debug!(target: "payload_builder", tx_hash = ?tx_hash, tx_da_size = ?tx_da_size, exclude_reverting_txs = ?exclude_reverting_txs, result = %result, "Considering transaction"); + debug!( + target: "payload_builder", + message = "Considering transaction", + tx_hash = ?tx_hash, + tx_da_size = ?tx_da_size, + exclude_reverting_txs = ?exclude_reverting_txs, + result = %result, + ); }; + num_txs_considered += 1; + if let Some(conditional) = conditional { // TODO: ideally we should get this from the txpool stream if !conditional.matches_block_attributes(&block_attr) { @@ -391,7 +406,6 @@ impl OpPayloadBuilderCtx { } } - num_txs_considered += 1; // ensure we still have capacity for this transaction if let Err(result) = info.is_tx_over_limits( tx_da_size, @@ -512,6 +526,13 @@ impl OpPayloadBuilderCtx { .payload_num_tx_simulated_fail .record(num_txs_simulated_fail as f64); + debug!( + target: "payload_builder", + message = "Completed executing best transactions", + txs_executed = num_txs_considered, + txs_applied = num_txs_simulated_success, + txs_rejected = num_txs_simulated_fail, + ); Ok(None) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 0257f8b71..a7b0f6c21 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -43,7 +43,7 @@ use rollup_boost::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tracing::{debug, error, warn}; +use tracing::{debug, error, metadata::Level, span, warn}; #[derive(Debug, Default)] struct ExtraExecutionInfo { @@ -138,6 +138,18 @@ where let block_build_start_time = Instant::now(); let BuildArguments { config, cancel, .. } = args; + // We log only every 100th block to reduce usage + let span = if cfg!(feature = "telemetry") && config.parent_header.number % 100 == 0 { + span!(Level::INFO, "build_payload") + } else { + tracing::Span::none() + }; + let _enter = span.enter(); + span.record( + "payload_id", + config.attributes.payload_attributes.id.to_string(), + ); + let chain_spec = self.client.chain_spec(); let timestamp = config.attributes.timestamp(); let block_env_attributes = OpNextBlockEnvAttributes { @@ -211,7 +223,12 @@ where .publish(&fb_payload) .map_err(PayloadBuilderError::other)?; - tracing::info!(target: "payload_builder", "Fallback block built"); + tracing::info!( + target: "payload_builder", + message = "Fallback block built", + payload_id = fb_payload.payload_id.to_string(), + ); + ctx.metrics .payload_num_tx .record(info.executed_transactions.len() as f64); @@ -276,6 +293,16 @@ where // Process flashblocks in a blocking loop loop { + let fb_span = if span.is_none() { + tracing::Span::none() + } else { + span!( + parent: &span, + Level::INFO, + "build_flashblock", + ) + }; + let _entered = fb_span.enter(); // Block on receiving a message, break on cancellation or closed channel let received = tokio::task::block_in_place(|| { // Get runtime handle @@ -430,7 +457,13 @@ where } } flashblock_count += 1; - tracing::info!(target: "payload_builder", "Flashblock {} built", flashblock_count); + tracing::info!( + target: "payload_builder", + message = "Flashblock built", + ?flashblock_count, + current_gas = info.cumulative_gas_used, + current_da = info.cumulative_da_bytes_used, + ); } } } @@ -440,6 +473,11 @@ where self.metrics .flashblock_count .record(flashblock_count as f64); + debug!( + target: "payload_builder", + message = "Payload building complete, channel closed or job cancelled" + ); + span.record("flashblock_count", flashblock_count); return Ok(()); } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 4455e7857..948026816 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -20,7 +20,7 @@ use tokio_tungstenite::{ tungstenite::{Message, Utf8Bytes}, WebSocketStream, }; -use tracing::warn; +use tracing::{debug, warn}; use crate::metrics::OpRBuilderMetrics; @@ -65,6 +65,14 @@ impl WebSocketPublisher { // Serialize the payload to a UTF-8 string // serialize only once, then just copy around only a pointer // to the serialized data for each subscription. + debug!( + target: "payload_builder", + message = "Sending flashblock to rollup-boost", + payload_id = payload.payload_id.to_string(), + index = payload.index, + base = payload.base.is_some(), + ); + let serialized = serde_json::to_string(payload)?; let utf8_bytes = Utf8Bytes::from(serialized); diff --git a/crates/op-rbuilder/src/launcher.rs b/crates/op-rbuilder/src/launcher.rs index a8904ed81..d725a49e8 100644 --- a/crates/op-rbuilder/src/launcher.rs +++ b/crates/op-rbuilder/src/launcher.rs @@ -26,12 +26,22 @@ use std::{marker::PhantomData, sync::Arc}; pub fn launch() -> Result<()> { let cli = Cli::parsed(); let mode = cli.builder_mode(); + + #[cfg(feature = "telemetry")] + let telemetry_args = match &cli.command { + reth_optimism_cli::commands::Commands::Node(node_command) => { + node_command.ext.telemetry.clone() + } + _ => Default::default(), + }; + let mut cli_app = cli.configure(); #[cfg(feature = "telemetry")] { - let otlp = reth_tracing_otlp::layer("op-reth"); - cli_app.access_tracing_layers()?.add_layer(otlp); + use crate::primitives::telemetry::setup_telemetry_layer; + let telemetry_layer = setup_telemetry_layer(&telemetry_args)?; + cli_app.access_tracing_layers()?.add_layer(telemetry_layer); } cli_app.init_tracing()?; diff --git a/crates/op-rbuilder/src/primitives/mod.rs b/crates/op-rbuilder/src/primitives/mod.rs index 2af3ab8be..da146d306 100644 --- a/crates/op-rbuilder/src/primitives/mod.rs +++ b/crates/op-rbuilder/src/primitives/mod.rs @@ -1,3 +1,4 @@ -pub mod reth; - pub mod bundle; +pub mod reth; +#[cfg(feature = "telemetry")] +pub mod telemetry; diff --git a/crates/op-rbuilder/src/primitives/telemetry.rs b/crates/op-rbuilder/src/primitives/telemetry.rs new file mode 100644 index 000000000..f663156b8 --- /dev/null +++ b/crates/op-rbuilder/src/primitives/telemetry.rs @@ -0,0 +1,30 @@ +use crate::args::TelemetryArgs; +use tracing_subscriber::{filter::Targets, Layer}; + +/// Setup telemetry layer with sampling and custom endpoint configuration +pub fn setup_telemetry_layer( + args: &TelemetryArgs, +) -> eyre::Result> { + use tracing::level_filters::LevelFilter; + + // Otlp uses evn vars inside + if let Some(endpoint) = &args.otlp_endpoint { + std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint); + } + if let Some(headers) = &args.otlp_headers { + std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", headers); + } + + // Create OTLP layer with custom configuration + let otlp_layer = reth_tracing_otlp::layer("op-rbuilder"); + + // Create a trace filter that sends more data to OTLP but less to stdout + let trace_filter = Targets::new() + .with_default(LevelFilter::WARN) + .with_target("op_rbuilder", LevelFilter::INFO) + .with_target("payload_builder", LevelFilter::DEBUG); + + let filtered_layer = otlp_layer.with_filter(trace_filter); + + Ok(filtered_layer) +} diff --git a/crates/op-rbuilder/src/tests/framework/apis.rs b/crates/op-rbuilder/src/tests/framework/apis.rs index d93572070..519261eea 100644 --- a/crates/op-rbuilder/src/tests/framework/apis.rs +++ b/crates/op-rbuilder/src/tests/framework/apis.rs @@ -92,7 +92,7 @@ impl EngineApi { pub fn with_localhost_port(port: u16) -> EngineApi { EngineApi:: { address: Address::Http( - format!("http://localhost:{}", port) + format!("http://localhost:{port}") .parse() .expect("Invalid URL"), ),