Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 25 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/rollup-boost/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ pub struct RollupBoostArgs {
#[arg(long, env)]
pub block_selection_policy: Option<BlockSelectionPolicy>,

/// Should we use the l2 client for computing state root
#[arg(long, env, default_value = "false")]
pub external_state_root: bool,

/// Allow all engine API calls to builder even when marked as unhealthy
/// This is default true assuming no builder CL set up
#[arg(long, env, default_value = "false")]
pub ignore_unhealthy_builders: bool,

#[clap(flatten)]
pub flashblocks: FlashblocksArgs,
}
Expand Down Expand Up @@ -160,6 +169,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand All @@ -175,6 +186,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand Down
26 changes: 22 additions & 4 deletions crates/rollup-boost/src/flashblocks/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use core::{
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
};
use futures::{Sink, SinkExt};
use futures::{Sink, SinkExt, StreamExt};
use std::{io, net::TcpListener, sync::Arc};
use tokio::{
net::TcpStream,
Expand Down Expand Up @@ -153,10 +153,10 @@ async fn broadcast_loop(
) {
let mut term = term;
let mut blocks = blocks;
let mut stream = stream;
let Ok(peer_addr) = stream.get_ref().peer_addr() else {
return;
};
let (mut sink, mut stream_read) = stream.split();

loop {
tokio::select! {
Expand All @@ -168,15 +168,33 @@ async fn broadcast_loop(
}
}

// Handle incoming WebSocket messages (including pings)
msg = stream_read.next() => {
match msg {
Some(Ok(_)) => {
// Ignore all inbound frames.
// Tungstenite will auto-respond to Ping and handle Close internally.
}
Some(Err(e)) => {
tracing::debug!("WebSocket error from {peer_addr}: {e}");
break;
}
None => {
tracing::debug!("WebSocket stream ended for {peer_addr}");
break;
}
}
}

// Receive payloads from the broadcast channel
payload = blocks.recv() => match payload {
Ok(payload) => {
// Here you would typically send the payload to the WebSocket clients.
// For this example, we just increment the sent counter.
sent.fetch_add(1, Ordering::Relaxed);

tracing::info!("Broadcasted payload: {:?}", payload);
if let Err(e) = stream.send(Message::Text(payload)).await {
tracing::debug!("Broadcasted payload: {:?}", payload);
if let Err(e) = sink.send(Message::Text(payload)).await {
tracing::debug!("Closing flashblocks subscription for {peer_addr}: {e}");
break; // Exit the loop if sending fails
}
Expand Down
63 changes: 62 additions & 1 deletion crates/rollup-boost/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_primitives::{B256, Bytes};
use futures::{StreamExt as _, stream};
use moka::future::Cache;

use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadId};
use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadAttributes, PayloadId};
use op_alloy_rpc_types_engine::{
OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
};
Expand Down Expand Up @@ -61,6 +61,67 @@ impl OpExecutionPayloadEnvelope {
.len(),
}
}

pub fn transactions(&self) -> Vec<Bytes> {
match self {
OpExecutionPayloadEnvelope::V3(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.transactions
.clone(),
OpExecutionPayloadEnvelope::V4(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.transactions
.clone(),
}
}

pub fn payload_attributes(&self) -> PayloadAttributes {
match self {
OpExecutionPayloadEnvelope::V3(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(payload.execution_payload.withdrawals().clone()),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
OpExecutionPayloadEnvelope::V4(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(
payload
.execution_payload
.payload_inner
.withdrawals()
.clone(),
),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
}
}
}

impl From<OpExecutionPayloadEnvelope> for ExecutionPayload {
Expand Down
2 changes: 2 additions & 0 deletions crates/rollup-boost/src/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use jsonrpsee::{
};
use parking_lot::Mutex;
use tower::{Layer, Service};
use tracing::info;

use crate::{Request, Response};

Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct Probes {

impl Probes {
pub fn set_health(&self, value: Health) {
info!(target: "rollup_boost::probe", "Updating health probe to to {:?}", value);
*self.health.lock() = value;
}

Expand Down
Loading
Loading