Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/flashblocks-rpc/src/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn try_decode_message(bytes: &[u8]) -> eyre::Result<FlashblocksPayloadV1> {
}

fn try_parse_message(bytes: &[u8]) -> eyre::Result<String> {
#[allow(clippy::collapsible_if)]
if let Ok(text) = String::from_utf8(bytes.to_vec()) {
if text.trim_start().starts_with("{") {
return Ok(text);
Expand Down
1 change: 1 addition & 0 deletions crates/flashblocks-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ where
debug!("get_balance: {:?}, {:?}", address, block_number);

let block_id = block_number.unwrap_or_default();
#[allow(clippy::collapsible_if)]
if block_id.is_pending() {
if let Some(balance) = self.flashblocks_api.get_balance(address).await {
return Ok(balance);
Expand Down
13 changes: 13 additions & 0 deletions crates/rollup-boost/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ pub struct RollupBoostArgs {
#[arg(long, env)]
pub block_selection_policy: Option<BlockSelectionPolicy>,

/// Should we use the l2 client for computing state root
#[arg(long, env, default_value = "false")]
pub external_state_root: bool,

/// Allow all engine API calls to builder even when marked as unhealthy
/// This is default true assuming no builder CL set up
#[arg(long, env, default_value = "false")]
pub ignore_unhealthy_builders: bool,

#[clap(flatten)]
pub flashblocks: FlashblocksArgs,
}
Expand Down Expand Up @@ -160,6 +169,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand All @@ -175,6 +186,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand Down
37 changes: 33 additions & 4 deletions crates/rollup-boost/src/flashblocks/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use core::{
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
};
use futures::{Sink, SinkExt};
use futures::{Sink, SinkExt, StreamExt};
use std::{io, net::TcpListener, sync::Arc};
use tokio::{
net::TcpStream,
Expand Down Expand Up @@ -153,10 +153,11 @@ async fn broadcast_loop(
) {
let mut term = term;
let mut blocks = blocks;
let mut stream = stream;
let stream = stream;
let Ok(peer_addr) = stream.get_ref().peer_addr() else {
return;
};
let (mut sink, mut stream_read) = stream.split();

loop {
tokio::select! {
Expand All @@ -168,15 +169,43 @@ async fn broadcast_loop(
}
}

// Handle incoming WebSocket messages (including pings)
msg = stream_read.next() => {
match msg {
Some(Ok(Message::Ping(payload))) => {
// Respond to ping with pong
if let Err(e) = sink.send(Message::Pong(payload)).await {
tracing::debug!("Failed to send pong to {peer_addr}: {e}");
break;
}
}
Some(Ok(Message::Close(_))) => {
tracing::debug!("Client {peer_addr} closed connection");
break;
}
Some(Err(e)) => {
tracing::debug!("WebSocket error from {peer_addr}: {e}");
break;
}
None => {
tracing::debug!("WebSocket stream ended for {peer_addr}");
break;
}
_ => {
// Ignore other message types (Text, Binary, Pong)
}
}
}

// Receive payloads from the broadcast channel
payload = blocks.recv() => match payload {
Ok(payload) => {
// Here you would typically send the payload to the WebSocket clients.
// For this example, we just increment the sent counter.
sent.fetch_add(1, Ordering::Relaxed);

tracing::info!("Broadcasted payload: {:?}", payload);
if let Err(e) = stream.send(Message::Text(payload)).await {
tracing::debug!("Broadcasted payload: {:?}", payload);
if let Err(e) = sink.send(Message::Text(payload)).await {
tracing::debug!("Closing flashblocks subscription for {peer_addr}: {e}");
break; // Exit the loop if sending fails
}
Expand Down
63 changes: 62 additions & 1 deletion crates/rollup-boost/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_primitives::{B256, Bytes};
use futures::{StreamExt as _, stream};
use moka::future::Cache;

use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadId};
use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadAttributes, PayloadId};
use op_alloy_rpc_types_engine::{
OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
};
Expand Down Expand Up @@ -61,6 +61,67 @@ impl OpExecutionPayloadEnvelope {
.len(),
}
}

pub fn transactions(&self) -> Vec<Bytes> {
match self {
OpExecutionPayloadEnvelope::V3(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.transactions
.clone(),
OpExecutionPayloadEnvelope::V4(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.transactions
.clone(),
}
}

pub fn payload_attributes(&self) -> PayloadAttributes {
match self {
OpExecutionPayloadEnvelope::V3(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(payload.execution_payload.withdrawals().clone()),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
OpExecutionPayloadEnvelope::V4(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(
payload
.execution_payload
.payload_inner
.withdrawals()
.clone(),
),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
}
}
}

impl From<OpExecutionPayloadEnvelope> for ExecutionPayload {
Expand Down
Loading
Loading