Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,4 @@ ahash = "0.8.6"
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
vergen = "9.0.4"
vergen-git2 = "1.0.5"
opentelemetry = { version = "0.29.1", features = ["trace"] }
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ docker-image-rbuilder: ## Build a rbuilder Docker image
.PHONY: lint
lint: ## Run the linters
cargo +nightly fmt -- --check
cargo +nightly clippy --features "$(FEATURES)" -- -D warnings
cargo +nightly clippy -p op-rbuilder --features "$(FEATURES)" -- -D warnings
cargo +nightly clippy --all-features -- -D warnings

.PHONY: test
test: ## Run the tests for rbuilder and op-rbuilder
Expand All @@ -58,8 +57,7 @@ lt: lint test ## Run "lint" and "test"
.PHONY: fmt
fmt: ## Format the code
cargo +nightly fmt
cargo +nightly clippy --features "$(FEATURES)" --fix --allow-staged --allow-dirty
cargo +nightly clippy -p op-rbuilder --features "$(FEATURES)" --fix --allow-staged --allow-dirty
cargo +nightly clippy --all-features --fix --allow-staged --allow-dirty
cargo +nightly fix --allow-staged --allow-dirty

.PHONY: bench
Expand Down
6 changes: 5 additions & 1 deletion crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ thiserror.workspace = true
parking_lot.workspace = true
url.workspace = true
anyhow = "1"
opentelemetry = { workspace = true, optional = true }

tower = "0.5"
futures = "0.3"
Expand Down Expand Up @@ -177,7 +178,10 @@ testing = [

interop = []

telemetry = ["reth-tracing-otlp"]
telemetry = [
"reth-tracing-otlp",
"opentelemetry",
]

custom-engine-api = []

Expand Down
2 changes: 1 addition & 1 deletion crates/op-rbuilder/src/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
metrics::{CARGO_PKG_VERSION, VERGEN_GIT_SHA},
};
use clap_builder::{CommandFactory, FromArgMatches};
pub use op::{FlashblocksArgs, OpRbuilderArgs};
pub use op::{FlashblocksArgs, OpRbuilderArgs, TelemetryArgs};
use playground::PlaygroundOptions;
use reth_optimism_cli::{chainspec::OpChainSpecParser, commands::Commands};

Expand Down
14 changes: 14 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct OpRbuilderArgs {
pub playground: Option<PathBuf>,
#[command(flatten)]
pub flashblocks: FlashblocksArgs,
#[command(flatten)]
pub telemetry: TelemetryArgs,
}

impl Default for OpRbuilderArgs {
Expand Down Expand Up @@ -124,3 +126,15 @@ impl Default for FlashblocksArgs {
node_command.ext.flashblocks
}
}

/// Parameters for telemetry configuration
#[derive(Debug, Clone, Default, PartialEq, Eq, clap::Args)]
pub struct TelemetryArgs {
/// OpenTelemetry endpoint for traces
#[arg(long = "telemetry.otlp-endpoint", env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
pub otlp_endpoint: Option<String>,

/// OpenTelemetry headers for authentication
#[arg(long = "telemetry.otlp-headers", env = "OTEL_EXPORTER_OTLP_HEADERS")]
pub otlp_headers: Option<String>,
}
27 changes: 24 additions & 3 deletions crates/op-rbuilder/src/builders/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,13 @@ impl OpPayloadBuilderCtx {
let tx_da_limit = self.da_config.max_da_tx_size();
let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone());

info!(target: "payload_builder", block_da_limit = ?block_da_limit, tx_da_size = ?tx_da_limit, block_gas_limit = ?block_gas_limit, "DA limits");
info!(
target: "payload_builder",
message = "Executing best transactions",
block_da_limit = ?block_da_limit,
tx_da_limit = ?tx_da_limit,
block_gas_limit = ?block_gas_limit,
);

// Remove once we merge Reth 1.4.4
// Fixed in https://github.com/paradigmxyz/reth/pull/16514
Expand Down Expand Up @@ -367,9 +373,18 @@ impl OpPayloadBuilderCtx {
reverted_hashes.is_some() && !reverted_hashes.unwrap().contains(&tx_hash);

let log_txn = |result: TxnExecutionResult| {
debug!(target: "payload_builder", tx_hash = ?tx_hash, tx_da_size = ?tx_da_size, exclude_reverting_txs = ?exclude_reverting_txs, result = %result, "Considering transaction");
debug!(
target: "payload_builder",
message = "Considering transaction",
tx_hash = ?tx_hash,
tx_da_size = ?tx_da_size,
exclude_reverting_txs = ?exclude_reverting_txs,
result = %result,
);
};

num_txs_considered += 1;

if let Some(conditional) = conditional {
// TODO: ideally we should get this from the txpool stream
if !conditional.matches_block_attributes(&block_attr) {
Expand All @@ -391,7 +406,6 @@ impl OpPayloadBuilderCtx {
}
}

num_txs_considered += 1;
// ensure we still have capacity for this transaction
if let Err(result) = info.is_tx_over_limits(
tx_da_size,
Expand Down Expand Up @@ -512,6 +526,13 @@ impl OpPayloadBuilderCtx {
.payload_num_tx_simulated_fail
.record(num_txs_simulated_fail as f64);

debug!(
target: "payload_builder",
message = "Completed executing best transactions",
txs_executed = num_txs_considered,
txs_applied = num_txs_simulated_success,
txs_rejected = num_txs_simulated_fail,
);
Ok(None)
}

Expand Down
44 changes: 41 additions & 3 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use rollup_boost::{
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
use tracing::{debug, error, metadata::Level, span, warn};

#[derive(Debug, Default)]
struct ExtraExecutionInfo {
Expand Down Expand Up @@ -138,6 +138,18 @@ where
let block_build_start_time = Instant::now();
let BuildArguments { config, cancel, .. } = args;

// We log only every 100th block to reduce usage
let span = if cfg!(feature = "telemetry") && config.parent_header.number % 100 == 0 {
span!(Level::INFO, "build_payload")
} else {
tracing::Span::none()
};
let _enter = span.enter();
span.record(
"payload_id",
config.attributes.payload_attributes.id.to_string(),
);

let chain_spec = self.client.chain_spec();
let timestamp = config.attributes.timestamp();
let block_env_attributes = OpNextBlockEnvAttributes {
Expand Down Expand Up @@ -211,7 +223,12 @@ where
.publish(&fb_payload)
.map_err(PayloadBuilderError::other)?;

tracing::info!(target: "payload_builder", "Fallback block built");
tracing::info!(
target: "payload_builder",
message = "Fallback block built",
payload_id = fb_payload.payload_id.to_string(),
);

ctx.metrics
.payload_num_tx
.record(info.executed_transactions.len() as f64);
Expand Down Expand Up @@ -276,6 +293,16 @@ where

// Process flashblocks in a blocking loop
loop {
let fb_span = if span.is_none() {
tracing::Span::none()
} else {
span!(
parent: &span,
Level::INFO,
"build_flashblock",
)
};
let _entered = fb_span.enter();
// Block on receiving a message, break on cancellation or closed channel
let received = tokio::task::block_in_place(|| {
// Get runtime handle
Expand Down Expand Up @@ -430,7 +457,13 @@ where
}
}
flashblock_count += 1;
tracing::info!(target: "payload_builder", "Flashblock {} built", flashblock_count);
tracing::info!(
target: "payload_builder",
message = "Flashblock built",
?flashblock_count,
current_gas = info.cumulative_gas_used,
current_da = info.cumulative_da_bytes_used,
);
}
}
}
Expand All @@ -440,6 +473,11 @@ where
self.metrics
.flashblock_count
.record(flashblock_count as f64);
debug!(
target: "payload_builder",
message = "Payload building complete, channel closed or job cancelled"
);
span.record("flashblock_count", flashblock_count);
return Ok(());
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/op-rbuilder/src/builders/flashblocks/wspub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_tungstenite::{
tungstenite::{Message, Utf8Bytes},
WebSocketStream,
};
use tracing::warn;
use tracing::{debug, warn};

use crate::metrics::OpRBuilderMetrics;

Expand Down Expand Up @@ -65,6 +65,14 @@ impl WebSocketPublisher {
// Serialize the payload to a UTF-8 string
// serialize only once, then just copy around only a pointer
// to the serialized data for each subscription.
debug!(
target: "payload_builder",
message = "Sending flashblock to rollup-boost",
payload_id = payload.payload_id.to_string(),
index = payload.index,
base = payload.base.is_some(),
);

let serialized = serde_json::to_string(payload)?;
let utf8_bytes = Utf8Bytes::from(serialized);

Expand Down
14 changes: 12 additions & 2 deletions crates/op-rbuilder/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,22 @@ use std::{marker::PhantomData, sync::Arc};
pub fn launch() -> Result<()> {
let cli = Cli::parsed();
let mode = cli.builder_mode();

#[cfg(feature = "telemetry")]
let telemetry_args = match &cli.command {
reth_optimism_cli::commands::Commands::Node(node_command) => {
node_command.ext.telemetry.clone()
}
_ => Default::default(),
};

let mut cli_app = cli.configure();

#[cfg(feature = "telemetry")]
{
let otlp = reth_tracing_otlp::layer("op-reth");
cli_app.access_tracing_layers()?.add_layer(otlp);
use crate::primitives::telemetry::setup_telemetry_layer;
let telemetry_layer = setup_telemetry_layer(&telemetry_args)?;
cli_app.access_tracing_layers()?.add_layer(telemetry_layer);
}

cli_app.init_tracing()?;
Expand Down
5 changes: 3 additions & 2 deletions crates/op-rbuilder/src/primitives/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod reth;

pub mod bundle;
pub mod reth;
#[cfg(feature = "telemetry")]
pub mod telemetry;
30 changes: 30 additions & 0 deletions crates/op-rbuilder/src/primitives/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::args::TelemetryArgs;
use tracing_subscriber::{filter::Targets, Layer};

/// Setup telemetry layer with sampling and custom endpoint configuration
pub fn setup_telemetry_layer(
args: &TelemetryArgs,
) -> eyre::Result<impl Layer<tracing_subscriber::Registry>> {
use tracing::level_filters::LevelFilter;

// Otlp uses evn vars inside
if let Some(endpoint) = &args.otlp_endpoint {
std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
}
if let Some(headers) = &args.otlp_headers {
std::env::set_var("OTEL_EXPORTER_OTLP_HEADERS", headers);
}

// Create OTLP layer with custom configuration
let otlp_layer = reth_tracing_otlp::layer("op-rbuilder");

// Create a trace filter that sends more data to OTLP but less to stdout
let trace_filter = Targets::new()
.with_default(LevelFilter::WARN)
.with_target("op_rbuilder", LevelFilter::INFO)
.with_target("payload_builder", LevelFilter::DEBUG);

let filtered_layer = otlp_layer.with_filter(trace_filter);

Ok(filtered_layer)
}
2 changes: 1 addition & 1 deletion crates/op-rbuilder/src/tests/framework/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl EngineApi<Http> {
pub fn with_localhost_port(port: u16) -> EngineApi<Http> {
EngineApi::<Http> {
address: Address::Http(
format!("http://localhost:{}", port)
format!("http://localhost:{port}")
.parse()
.expect("Invalid URL"),
),
Expand Down
Loading