Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/flashblocks-rpc/src/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl FlashblocksOverlay {
match msg {
Ok(Message::Binary(bytes)) => match try_decode_message(&bytes) {
Ok(payload) => {
info!("Received payload: {:?}", payload);
debug!("Received payload: {:?}", payload);

let _ = sender
.send(InternalMessage::NewPayload(payload))
Expand Down
13 changes: 13 additions & 0 deletions crates/rollup-boost/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ pub struct RollupBoostArgs {
#[arg(long, env)]
pub block_selection_policy: Option<BlockSelectionPolicy>,

/// Should we use the l2 client for computing state root
#[arg(long, env, default_value = "false")]
pub external_state_root: bool,

/// Allow all engine API calls to builder even when marked as unhealthy
/// This is default true assuming no builder CL set up
#[arg(long, env, default_value = "false")]
pub ignore_unhealthy_builders: bool,

#[clap(flatten)]
pub flashblocks: FlashblocksArgs,
}
Expand Down Expand Up @@ -160,6 +169,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand All @@ -175,6 +186,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand Down
63 changes: 62 additions & 1 deletion crates/rollup-boost/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_primitives::{B256, Bytes};
use futures::{StreamExt as _, stream};
use moka::future::Cache;

use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadId};
use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadAttributes, PayloadId};
use op_alloy_rpc_types_engine::{
OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
};
Expand Down Expand Up @@ -61,6 +61,67 @@ impl OpExecutionPayloadEnvelope {
.len(),
}
}

pub fn transactions(&self) -> Vec<Bytes> {
match self {
OpExecutionPayloadEnvelope::V3(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.transactions
.clone(),
OpExecutionPayloadEnvelope::V4(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.transactions
.clone(),
}
}

pub fn payload_attributes(&self) -> PayloadAttributes {
match self {
OpExecutionPayloadEnvelope::V3(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(payload.execution_payload.withdrawals().clone()),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
OpExecutionPayloadEnvelope::V4(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(
payload
.execution_payload
.payload_inner
.withdrawals()
.clone(),
),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
}
}
}

impl From<OpExecutionPayloadEnvelope> for ExecutionPayload {
Expand Down
125 changes: 116 additions & 9 deletions crates/rollup-boost/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use op_alloy_rpc_types_engine::{
};
use opentelemetry::trace::SpanKind;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::{error, info, instrument};
use tracing::{debug, error, info, instrument};

pub type Request = HttpRequest;
pub type Response = HttpResponse;
Expand All @@ -49,8 +50,12 @@ pub struct RollupBoostServer<T: EngineApiExt> {
pub builder_client: Arc<T>,
pub payload_trace_context: Arc<PayloadTraceContext>,
block_selection_policy: Option<BlockSelectionPolicy>,
external_state_root: bool,
ignore_unhealthy_builders: bool,
execution_mode: Arc<Mutex<ExecutionMode>>,
probes: Arc<Probes>,
payload_to_fcu_request:
Arc<Mutex<HashMap<PayloadId, (ForkchoiceState, Option<OpPayloadAttributes>)>>>,
}

impl<T> RollupBoostServer<T>
Expand All @@ -63,6 +68,8 @@ where
initial_execution_mode: Arc<Mutex<ExecutionMode>>,
block_selection_policy: Option<BlockSelectionPolicy>,
probes: Arc<Probes>,
external_state_root: bool,
ignore_unhealthy_builders: bool,
) -> Self {
Self {
l2_client: Arc::new(l2_client),
Expand All @@ -71,6 +78,9 @@ where
payload_trace_context: Arc::new(PayloadTraceContext::new()),
execution_mode: initial_execution_mode,
probes,
external_state_root,
ignore_unhealthy_builders,
payload_to_fcu_request: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -121,7 +131,7 @@ where
.await;

// async call to builder to sync the builder node
if !self.execution_mode().is_disabled() {
if !self.execution_mode().is_disabled() && !self.should_skip_unhealthy_builder() {
let builder = self.builder_client.clone();
let new_payload_clone = new_payload.clone();
tokio::spawn(async move { builder.new_payload(new_payload_clone).await });
Expand Down Expand Up @@ -182,20 +192,84 @@ where
return RpcResult::Ok(None);
}

if self.should_skip_unhealthy_builder() {
info!(message = "builder is unhealthy, skipping get_payload call to builder");
return RpcResult::Ok(None);
}

// Get payload and validate with the local l2 client
tracing::Span::current().record("builder_has_payload", true);
info!(message = "builder has payload, calling get_payload on builder");

let payload = self.builder_client.get_payload(payload_id, version).await?;
let _ = self

if !self.external_state_root {
let _ = self
.l2_client
.new_payload(NewPayload::from(payload.clone()))
.await?;

return Ok(Some(payload));
}

let fcu_info = self
.payload_to_fcu_request
.lock()
.remove(&payload_id)
.unwrap()
.to_owned()
.clone();

let new_payload_attrs = match fcu_info.1.as_ref() {
Some(attrs) => OpPayloadAttributes {
payload_attributes: attrs.payload_attributes.clone(),
transactions: Some(payload.transactions()),
no_tx_pool: Some(true),
gas_limit: attrs.gas_limit,
eip_1559_params: attrs.eip_1559_params,
},
None => OpPayloadAttributes {
payload_attributes: payload.payload_attributes(),
transactions: Some(payload.transactions()),
no_tx_pool: Some(true),
gas_limit: None,
eip_1559_params: None,
},
};

let l2_result = self
.l2_client
.new_payload(NewPayload::from(payload.clone()))
.fork_choice_updated_v3(fcu_info.0, Some(new_payload_attrs))
.await?;

Ok(Some(payload))
if let Some(new_payload_id) = l2_result.payload_id {
debug!(
message = "sent FCU to l2 to calculate new state root",
"returned_payload_id" = %new_payload_id,
"old_payload_id" = %payload_id,
);
let l2_payload = self.l2_client.get_payload(new_payload_id, version).await;

match l2_payload {
Ok(new_payload) => {
debug!(
message = "received new state root payload from l2",
payload = ?new_payload,
);
return Ok(Some(new_payload));
}

Err(e) => {
error!(message = "error getting new state root payload from l2", error = %e);
return Err(e.into());
}
}
}

Ok(None)
};

let (l2_payload, builder_payload) = tokio::join!(l2_fut, builder_fut);
let (builder_payload, l2_payload) = tokio::join!(builder_fut, l2_fut);

// Evaluate the builder and l2 response and select the final payload
let (payload, context) = {
Expand Down Expand Up @@ -252,6 +326,7 @@ where
let inner_payload = ExecutionPayload::from(payload.clone());
let block_hash = inner_payload.block_hash();
let block_number = inner_payload.block_number();
let state_root = inner_payload.as_v1().state_root;

// Note: This log message is used by integration tests to track payload context.
// While not ideal to rely on log parsing, it provides a reliable way to verify behavior.
Expand All @@ -260,11 +335,16 @@ where
message = "returning block",
"hash" = %block_hash,
"number" = %block_number,
"state_root" = %state_root,
%context,
%payload_id,
);
Ok(payload)
}

fn should_skip_unhealthy_builder(&self) -> bool {
self.ignore_unhealthy_builders && !matches!(self.probes.health(), Health::Healthy)
}
}

impl<T> TryInto<RpcModule<()>> for RollupBoostServer<T>
Expand Down Expand Up @@ -357,6 +437,12 @@ where
return Ok(l2_fut.await?);
}

// If traffic to the unhealthy builder is not allowed and the builder is unhealthy,
if self.should_skip_unhealthy_builder() {
info!(message = "builder is unhealthy, skipping FCU to builder");
return Ok(l2_fut.await?);
}

let span = tracing::Span::current();
// If the fcu contains payload attributes and the tx pool is disabled,
// only forward the FCU to the default l2 client
Expand All @@ -378,6 +464,10 @@ where
span.id(),
)
.await;

self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

// We always return the value from the l2 client
Expand All @@ -387,7 +477,7 @@ where
// to both the builder and the default l2 client
let builder_fut = self
.builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes);
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone());

let (l2_result, builder_result) = tokio::join!(l2_fut, builder_fut);
let l2_response = l2_result?;
Expand All @@ -407,6 +497,10 @@ where
span.id(),
)
.await;

self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

return Ok(l2_response);
Expand All @@ -416,15 +510,23 @@ where
// forward the fcu to the builder to keep it synced and immediately return the l2
// response without awaiting the builder
let builder_client = self.builder_client.clone();
let attrs_clone = payload_attributes.clone();
tokio::spawn(async move {
// It is not critical to wait for the builder response here
// During moments of high load, Op-node can send hundreds of FCU requests
// and we want to ensure that we don't block the main thread in those scenarios
builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.fork_choice_updated_v3(fork_choice_state, attrs_clone)
.await
});
return Ok(l2_fut.await?);
let l2_response = l2_fut.await?;
if let Some(payload_id) = l2_response.payload_id {
self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

return Ok(l2_response);
}
}

Expand Down Expand Up @@ -669,12 +771,17 @@ pub mod tests {
let (probe_layer, probes) = ProbeLayer::new();
let execution_mode = Arc::new(Mutex::new(ExecutionMode::Enabled));

// For tests, set initial health to Healthy since we don't run health checks
probes.set_health(Health::Healthy);

let rollup_boost = RollupBoostServer::new(
l2_client,
builder_client,
execution_mode.clone(),
None,
probes.clone(),
false,
true,
);

let module: RpcModule<()> = rollup_boost.try_into().unwrap();
Expand Down
Loading
Loading