Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ lazy_static = "1.4.0"
tikv-jemallocator = { version = "0.6" }
tracing = "0.1.37"
metrics = { version = "0.24.1" }
tokio-metrics = { version = "0.4.7" }
ahash = "0.8.6"
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
vergen = "9.0.4"
Expand Down
1 change: 1 addition & 0 deletions crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ either.workspace = true
metrics.workspace = true
serde_json.workspace = true
tokio-util.workspace = true
tokio-metrics.workspace = true
thiserror.workspace = true
parking_lot.workspace = true
url.workspace = true
Expand Down
25 changes: 23 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
gas_limiter::AddressGasLimiter,
metrics::OpRBuilderMetrics,
primitives::reth::ExecutionInfo,
tokio_metrics::FlashblocksTaskMetrics,
traits::{ClientBounds, PoolBounds},
};
use alloy_consensus::{
Expand Down Expand Up @@ -187,6 +188,8 @@ pub(super) struct OpPayloadBuilder<Pool, Client, BuilderTx> {
pub builder_tx: BuilderTx,
/// Rate limiting based on gas. This is an optional feature.
pub address_gas_limiter: AddressGasLimiter,
/// Tokio task metrics for monitoring spawned tasks
pub task_metrics: Arc<FlashblocksTaskMetrics>,
}

impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
Expand All @@ -201,6 +204,7 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
payload_tx: mpsc::Sender<OpBuiltPayload>,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
task_metrics: Arc<FlashblocksTaskMetrics>,
) -> Self {
let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone());
Self {
Expand All @@ -213,6 +217,7 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
metrics,
builder_tx,
address_gas_limiter,
task_metrics,
}
}
}
Expand Down Expand Up @@ -525,8 +530,10 @@ where
std::sync::mpsc::sync_channel((self.config.flashblocks_per_block() + 1) as usize);
let build_at_interval_end = self.config.specific.build_at_interval_end;

tokio::spawn({
tokio::spawn(self.task_metrics.flashblock_timer.instrument({
let block_cancel = block_cancel.clone();
let flashblock_index = ctx.flashblock_index();
let block_number = ctx.block_number();

async move {
// If NOT building at interval end, send immediate signal to build first
Expand Down Expand Up @@ -555,6 +562,13 @@ where
loop {
tokio::select! {
_ = timer.tick() => {
debug!(
target: "payload_builder",
payload_id = ?fb_payload.payload_id,
flashblock_index = flashblock_index,
block_number = block_number,
"Triggering next flashblock with timer",
);
// cancel current payload building job
fb_cancel.cancel();
fb_cancel = block_cancel.child_token();
Expand Down Expand Up @@ -590,14 +604,21 @@ where
}
}
}
});
}));

// Process flashblocks - block on async channel receive
loop {
// Wait for signal before building flashblock.
// If build_at_interval_end is false, an immediate signal is sent so we don't wait.
// If build_at_interval_end is true, we wait for the timer tick (first_flashblock_offset).
if let Ok(new_fb_cancel) = rx.recv() {
debug!(
target: "payload_builder",
payload_id = ?fb_payload.payload_id,
flashblock_index = ctx.flashblock_index(),
block_number = ctx.block_number(),
"Received signal to build flashblock",
);
ctx = ctx.with_cancel(new_fb_cancel);
} else {
// Channel closed - block building cancelled
Expand Down
37 changes: 29 additions & 8 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
},
flashtestations::service::bootstrap_flashtestations,
metrics::OpRBuilderMetrics,
tokio_metrics::FlashblocksTaskMetrics,
traits::{NodeBounds, PoolBounds},
};
use eyre::WrapErr as _;
Expand All @@ -23,7 +24,7 @@ use reth_node_builder::{BuilderContext, components::PayloadServiceBuilder};
use reth_optimism_evm::OpEvmConfig;
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_provider::CanonStateSubscriptions;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

pub struct FlashblocksServiceBuilder(pub BuilderConfig<FlashblocksConfig>);

Expand Down Expand Up @@ -106,12 +107,16 @@ impl FlashblocksServiceBuilder {
};

let metrics = Arc::new(OpRBuilderMetrics::default());
let task_metrics = Arc::new(FlashblocksTaskMetrics::new());
let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16);

let ws_pub: Arc<WebSocketPublisher> =
WebSocketPublisher::new(self.0.specific.ws_addr, metrics.clone())
.wrap_err("failed to create ws publisher")?
.into();
let ws_pub: Arc<WebSocketPublisher> = WebSocketPublisher::new(
self.0.specific.ws_addr,
metrics.clone(),
&task_metrics.websocket_publisher,
)
.wrap_err("failed to create ws publisher")?
.into();
let payload_builder = OpPayloadBuilder::new(
OpEvmConfig::optimism(ctx.chain_spec()),
pool,
Expand All @@ -121,6 +126,7 @@ impl FlashblocksServiceBuilder {
built_payload_tx,
ws_pub.clone(),
metrics.clone(),
task_metrics.clone(),
);
let payload_job_config = BasicPayloadJobGeneratorConfig::default();

Expand Down Expand Up @@ -154,13 +160,28 @@ impl FlashblocksServiceBuilder {
cancel,
);

ctx.task_executor()
.spawn_critical("custom payload builder service", Box::pin(payload_service));
ctx.task_executor().spawn_critical(
"custom payload builder service",
Box::pin(
task_metrics
.payload_builder_service
.instrument(payload_service),
),
);
ctx.task_executor().spawn_critical(
"flashblocks payload handler",
Box::pin(payload_handler.run()),
Box::pin(
task_metrics
.payload_handler
.instrument(payload_handler.run()),
),
);

// Spawn the tokio metrics collector (records metrics every second)
task_metrics
.clone()
.spawn_metrics_collector(Duration::from_secs(1));

tracing::info!("Flashblocks payload builder service started");
Ok(payload_builder_handle)
}
Expand Down
12 changes: 8 additions & 4 deletions crates/op-rbuilder/src/builders/flashblocks/wspub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_tungstenite::{
};
use tracing::{debug, warn};

use crate::metrics::OpRBuilderMetrics;
use crate::{metrics::OpRBuilderMetrics, tokio_metrics::MonitoredTask};

/// A WebSockets publisher that accepts connections from client websockets and broadcasts to them
/// updates about new flashblocks. It maintains a count of sent messages and active subscriptions.
Expand All @@ -34,22 +34,26 @@ pub(super) struct WebSocketPublisher {
}

impl WebSocketPublisher {
pub(super) fn new(addr: SocketAddr, metrics: Arc<OpRBuilderMetrics>) -> io::Result<Self> {
pub(super) fn new(
addr: SocketAddr,
metrics: Arc<OpRBuilderMetrics>,
task_monitor: &MonitoredTask,
) -> io::Result<Self> {
let (pipe, _) = broadcast::channel(100);
let (term, _) = watch::channel(false);

let sent = Arc::new(AtomicUsize::new(0));
let subs = Arc::new(AtomicUsize::new(0));
let listener = TcpListener::bind(addr)?;

tokio::spawn(listener_loop(
tokio::spawn(task_monitor.instrument(listener_loop(
listener,
metrics,
pipe.subscribe(),
term.subscribe(),
Arc::clone(&sent),
Arc::clone(&subs),
));
)));

Ok(Self {
sent,
Expand Down
1 change: 1 addition & 0 deletions crates/op-rbuilder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod metrics;
mod monitor_tx_pool;
pub mod primitives;
pub mod revert_protection;
pub mod tokio_metrics;
pub mod traits;
pub mod tx;
pub mod tx_signer;
Expand Down
Loading
Loading