Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }

rollup-boost = { git = "https://github.com/flashbots/rollup-boost", rev = "e74a1fd01366e4ddd13515da4efda59cdc8fbce0" }
thiserror = "2.0.11"
url = "2.5.3"

[target.'cfg(unix)'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/op-rbuilder/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use reth_optimism_node::args::RollupArgs;

use crate::tx_signer::Signer;
use alloy_transport_http::reqwest::Url;

/// Parameters for rollup configuration
#[derive(Debug, Clone, Default, PartialEq, Eq, clap::Args)]
Expand All @@ -17,4 +18,7 @@ pub struct OpRbuilderArgs {
/// Builder secret key for signing last transaction in block
#[arg(long = "rollup.builder-secret-key", env = "BUILDER_SECRET_KEY")]
pub builder_signer: Option<Signer>,
/// URL of the supervisor service for transaction validation
#[arg(long = "rollup.supervisor-url", env = "SUPERVISOR_URL")]
pub supervisor_url: Option<Url>,
}
9 changes: 4 additions & 5 deletions crates/op-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ fn main() {
let op_node = OpNode::new(rollup_args.clone());
let handle = builder
.with_types::<OpNode>()
.with_components(
op_node
.components()
.payload(CustomOpPayloadBuilder::new(builder_args.builder_signer)),
)
.with_components(op_node.components().payload(CustomOpPayloadBuilder::new(
builder_args.builder_signer,
builder_args.supervisor_url,
)))
.with_add_ons(
OpAddOnsBuilder::default()
.with_sequencer(rollup_args.sequencer_http.clone())
Expand Down
24 changes: 24 additions & 0 deletions crates/op-rbuilder/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pub struct OpRBuilderMetrics {
pub tx_byte_size: Histogram,
/// Number of reverted transactions
pub num_reverted_tx: Counter,
/// Number of cross-chain transactions
pub num_cross_chain_tx: Counter,
/// Number of cross-chain transactions that didn't pass supervisor validation
pub num_cross_chain_tx_fail: Counter,
/// Number of cross-chain transactions that weren't verified because of the timeout
pub num_cross_chain_tx_timeout: Counter,
/// Number of cross-chain transactions that weren't verified because of the server error
pub num_cross_chain_tx_server_error: Counter,
}

impl OpRBuilderMetrics {
Expand All @@ -70,4 +78,20 @@ impl OpRBuilderMetrics {
pub fn set_builder_balance(&self, balance: f64) {
self.builder_balance.set(balance);
}

pub fn inc_num_cross_chain_tx_fail(&self) {
self.num_cross_chain_tx_fail.increment(1);
}

pub fn inc_num_cross_chain_tx(&self) {
self.num_cross_chain_tx.increment(1);
}

pub fn inc_num_cross_chain_tx_timeout(&self) {
self.num_cross_chain_tx_timeout.increment(1);
}

pub fn inc_num_cross_chain_tx_server_error(&self) {
self.num_cross_chain_tx_server_error.increment(1);
}
}
137 changes: 120 additions & 17 deletions crates/op-rbuilder/src/payload_builder_vanilla.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::generator::BlockPayloadJobGenerator;
use crate::generator::BuildArguments;
use crate::{
generator::{BlockCell, PayloadBuilder},
generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder},
metrics::OpRBuilderMetrics,
primitives::kona::{
ExecutingMessage, ExecutingMessageValidator, ExecutingMessageValidatorError, SafetyLevel,
SupervisorValidator,
},
tx_signer::Signer,
};
use alloy_consensus::constants::EMPTY_WITHDRAWALS;
Expand All @@ -15,6 +17,7 @@ use alloy_primitives::private::alloy_rlp::Encodable;
use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
use alloy_rpc_types_engine::PayloadId;
use alloy_rpc_types_eth::Withdrawals;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use op_alloy_consensus::{OpDepositReceipt, OpTypedTransaction};
use reth::builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext};
use reth::core::primitives::InMemorySize;
Expand Down Expand Up @@ -78,15 +81,21 @@ use std::{fmt::Display, sync::Arc, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};

#[derive(Debug, Clone, Copy, Default)]
use url::Url;

#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CustomOpPayloadBuilder {
builder_signer: Option<Signer>,
supervisor_url: Option<Url>,
}

impl CustomOpPayloadBuilder {
pub fn new(builder_signer: Option<Signer>) -> Self {
Self { builder_signer }
pub fn new(builder_signer: Option<Signer>, supervisor_url: Option<Url>) -> Self {
Self {
builder_signer,
supervisor_url,
}
}
}

Expand Down Expand Up @@ -116,6 +125,7 @@ where
pool,
ctx.provider().clone(),
Arc::new(BasicOpReceiptBuilder::default()),
self.supervisor_url.clone(),
))
}

Expand Down Expand Up @@ -197,6 +207,8 @@ pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, T
pub metrics: OpRBuilderMetrics,
/// Node primitive types.
pub receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
/// Client to execute supervisor validation
pub supervisor_client: Option<HttpClient>,
}

impl<Pool, Client, EvmConfig, N: NodePrimitives>
Expand All @@ -209,13 +221,15 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
supervisor_url: Option<Url>,
) -> Self {
Self::with_builder_config(
evm_config,
builder_signer,
pool,
client,
receipt_builder,
supervisor_url,
Default::default(),
)
}
Expand All @@ -226,8 +240,14 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
supervisor_url: Option<Url>,
config: OpBuilderConfig,
) -> Self {
let supervisor_client = supervisor_url.map(|url| {
HttpClientBuilder::default()
.build(url)
.expect("building supervisor http client")
});
Self {
pool,
client,
Expand All @@ -237,6 +257,7 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
best_transactions: (),
metrics: Default::default(),
builder_signer,
supervisor_client,
}
}
}
Expand Down Expand Up @@ -270,7 +291,7 @@ where
},
|hashes| {
#[allow(clippy::unit_arg)]
self.best_transactions.remove_reverted(pool.clone(), hashes)
self.best_transactions.remove_invalid(pool.clone(), hashes)
},
)? {
BuildOutcome::Better { payload, .. } => {
Expand Down Expand Up @@ -344,6 +365,7 @@ where
receipt_builder: self.receipt_builder.clone(),
builder_signer: self.builder_signer,
metrics: Default::default(),
supervisor_client: self.supervisor_client.clone(),
};

let builder = OpBuilder::new(best, remove_reverted);
Expand Down Expand Up @@ -406,7 +428,7 @@ pub struct OpBuilder<'a, Txs> {
best: Box<dyn FnOnce(BestTransactionsAttributes) -> Txs + 'a>,
/// Removes reverted transactions from the tx pool
#[debug(skip)]
remove_reverted: Box<dyn FnOnce(Vec<TxHash>) + 'a>,
remove_invalid: Box<dyn FnOnce(Vec<TxHash>) + 'a>,
}

impl<'a, Txs> OpBuilder<'a, Txs> {
Expand All @@ -416,7 +438,7 @@ impl<'a, Txs> OpBuilder<'a, Txs> {
) -> Self {
Self {
best: Box::new(best),
remove_reverted: Box::new(remove_reverted),
remove_invalid: Box::new(remove_reverted),
}
}
}
Expand All @@ -438,7 +460,7 @@ impl<Txs> OpBuilder<'_, Txs> {
{
let Self {
best,
remove_reverted,
remove_invalid,
} = self;
info!(target: "payload_builder", id=%ctx.payload_id(), parent_header = ?ctx.parent().hash(), parent_number = ctx.parent().number, "building new payload");

Expand Down Expand Up @@ -535,7 +557,7 @@ impl<Txs> OpBuilder<'_, Txs> {
None
};

remove_reverted(info.reverted_tx_hashes.iter().copied().collect());
remove_invalid(info.invalid_tx_hashes.iter().copied().collect());

let payload = ExecutedPayload {
info,
Expand Down Expand Up @@ -704,8 +726,8 @@ pub trait OpPayloadTransactions<Transaction>: Clone + Send + Sync + Unpin + 'sta
attr: BestTransactionsAttributes,
) -> impl PayloadTransactions<Transaction = Transaction>;

/// Removes reverted transactions from the tx pool
fn remove_reverted<Pool: TransactionPool<Transaction = Transaction>>(
/// Removes invalid transactions from the tx pool
fn remove_invalid<Pool: TransactionPool<Transaction = Transaction>>(
&self,
pool: Pool,
hashes: Vec<TxHash>,
Expand All @@ -721,7 +743,7 @@ impl<T: PoolTransaction> OpPayloadTransactions<T> for () {
BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr))
}

fn remove_reverted<Pool: TransactionPool<Transaction = T>>(
fn remove_invalid<Pool: TransactionPool<Transaction = T>>(
&self,
pool: Pool,
hashes: Vec<TxHash>,
Expand Down Expand Up @@ -755,7 +777,7 @@ pub struct ExecutionInfo<N: NodePrimitives> {
/// Tracks fees from executed mempool transactions
pub total_fees: U256,
/// Tracks the reverted transaction hashes to remove from the transaction pool
pub reverted_tx_hashes: HashSet<TxHash>,
pub invalid_tx_hashes: HashSet<TxHash>,
}

impl<N: NodePrimitives> ExecutionInfo<N> {
Expand All @@ -768,7 +790,7 @@ impl<N: NodePrimitives> ExecutionInfo<N> {
cumulative_gas_used: 0,
cumulative_da_bytes_used: 0,
total_fees: U256::ZERO,
reverted_tx_hashes: HashSet::new(),
invalid_tx_hashes: HashSet::new(),
}
}

Expand Down Expand Up @@ -820,6 +842,8 @@ pub struct OpPayloadBuilderCtx<EvmConfig: ConfigureEvmEnv, ChainSpec, N: NodePri
pub builder_signer: Option<Signer>,
/// The metrics for the builder
pub metrics: OpRBuilderMetrics,
/// Client to execute supervisor validation
pub supervisor_client: Option<HttpClient>,
}

impl<EvmConfig, ChainSpec, N> OpPayloadBuilderCtx<EvmConfig, ChainSpec, N>
Expand Down Expand Up @@ -1110,6 +1134,27 @@ where
return Err(PayloadBuilderError::EvmExecutionError(Box::new(err)));
}
};
// op-supervisor validation
match self.validate_supervisor_messages(&result)? {
Ok(()) => (),
Err(err) => match err {
ExecutingMessageValidatorError::SupervisorServerError(err) => {
warn!(target: "payload_builder", %err, ?sequencer_tx, "Supervisor error, skipping.");
self.metrics.inc_num_cross_chain_tx_server_error();
continue;
}
ExecutingMessageValidatorError::ValidationTimeout(_) => {
trace!(target: "payload_builder", %err, ?sequencer_tx, "Executing message validation timed out, skipping.");
self.metrics.inc_num_cross_chain_tx_timeout();
continue;
}
err => {
trace!(target: "payload_builder", %err, ?sequencer_tx, "Executing message rejected.");
self.metrics.inc_num_cross_chain_tx_fail();
continue;
}
},
}

// commit changes
evm.db_mut().commit(state);
Expand Down Expand Up @@ -1208,6 +1253,34 @@ where
}
};

match self.validate_supervisor_messages(&result)? {
Ok(()) => (),
Err(err) => {
match err {
ExecutingMessageValidatorError::SupervisorServerError(err) => {
trace!(target: "payload_builder", %err, ?tx, "Supervisor error, skipping.");
self.metrics.inc_num_cross_chain_tx_server_error();
continue;
}
ExecutingMessageValidatorError::ValidationTimeout(_) => {
trace!(target: "payload_builder", %err, ?tx, "Executing message validation timed out, skipping.");
self.metrics.inc_num_cross_chain_tx_timeout();
continue;
}
err => {
trace!(target: "payload_builder", %err, ?tx, "Executing message rejected.");
self.metrics.inc_num_cross_chain_tx_fail();
// It's possible that transaction invalid now, but would be valid later.
// We should keep limited queue for transactions that could become valid.
// We should have the limit to ensure that builder won't get overwhelmed.
best_txs.mark_invalid(tx.signer(), tx.nonce());
info.invalid_tx_hashes.insert(*tx.tx_hash());
continue;
}
}
}
}

self.metrics
.tx_simulation_duration
.record(tx_simulation_start_time.elapsed());
Expand All @@ -1219,7 +1292,7 @@ where
num_txs_simulated_fail += 1;
trace!(target: "payload_builder", ?tx, "skipping reverted transaction");
best_txs.mark_invalid(tx.signer(), tx.nonce());
info.reverted_tx_hashes.insert(*tx.tx_hash());
info.invalid_tx_hashes.insert(*tx.tx_hash());
continue;
}

Expand Down Expand Up @@ -1266,6 +1339,36 @@ where
Ok(None)
}

pub fn validate_supervisor_messages(
&self,
result: &ExecutionResult,
) -> Result<Result<(), ExecutingMessageValidatorError>, PayloadBuilderError> {
if let Some(client) = &self.supervisor_client {
let executing_messages =
SupervisorValidator::parse_messages(result.clone().into_logs().as_slice())
.flatten()
.collect::<Vec<ExecutingMessage>>();
if !executing_messages.is_empty() {
self.metrics.inc_num_cross_chain_tx();
let (channel_tx, rx) = std::sync::mpsc::channel();
tokio::task::block_in_place(move || {
let res = tokio::runtime::Handle::current().block_on(async {
SupervisorValidator::validate_messages(
client,
executing_messages.as_slice(),
SafetyLevel::CrossUnsafe,
Some(core::time::Duration::from_millis(100)),
)
.await
});
let _ = channel_tx.send(res);
});
return rx.recv().map_err(|_| PayloadBuilderError::ChannelClosed);
}
}
Ok(Ok(()))
}

pub fn add_builder_tx<DB>(
&self,
info: &mut ExecutionInfo<N>,
Expand Down
Loading
Loading