Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
op-alloy-rpc-types-engine = "0.12.0"
alloy-rpc-types-engine = "0.13.0"
alloy-rpc-types-eth = "0.13.0"
alloy-primitives = { version = "0.8.10", features = ["rand"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1.4"
Expand Down Expand Up @@ -51,7 +52,7 @@ rand = "0.9.0"
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
op-alloy-consensus = "0.12.0"
alloy-eips = { version = "0.13.0", features = ["serde"] }
alloy-rpc-types-eth = "0.13.0"
alloy-consensus = {version = "0.13.0", features = ["serde"] }
anyhow = "1.0"
testcontainers = { version = "0.23.3" }
assert_cmd = "2.0.10"
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/stress.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ run() {
# the transactions will be included in the canonical blocks and finalized.

# Figure out first the builder's JSON-RPC URL
ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-1-op-kurtosis rpc)
OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-1-op-reth-op-node-op-kurtosis rpc)
ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-2151908-1-op-kurtosis rpc)
OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-2151908-1-op-reth-op-node-op-kurtosis rpc)

# Private key with prefunded balance
PREFUNDED_PRIV_KEY=0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d
Expand Down
10 changes: 10 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ pub struct Args {
#[clap(flatten)]
pub l2_client: L2ClientArgs,

/// Duration in seconds between async health checks on the builder
#[arg(long, env, default_value = "60")]
pub health_check_interval: u64,

/// Max duration in seconds between the unsafe head block of the builder and the current time
#[arg(long, env, default_value = "5")]
pub max_unsafe_interval: u64,

/// Disable using the proposer to sync the builder node
#[arg(long, env, default_value = "false")]
pub no_boost_sync: bool,
Expand Down Expand Up @@ -164,6 +172,8 @@ impl Args {
boost_sync_enabled,
self.execution_mode,
probes,
self.health_check_interval,
self.max_unsafe_interval,
);

// Spawn the debug server
Expand Down
14 changes: 14 additions & 0 deletions src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use crate::client::auth::AuthLayer;
use crate::server::{
EngineApiClient, NewPayload, OpExecutionPayloadEnvelope, PayloadSource, Version,
};

use alloy_primitives::{B256, Bytes};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtError, JwtSecret,
PayloadId, PayloadStatus,
};
use alloy_rpc_types_eth::{Block, BlockNumberOrTag};
use clap::{Parser, arg};
use http::Uri;
use jsonrpsee::http_client::transport::HttpBackend;
Expand Down Expand Up @@ -322,6 +324,18 @@ impl RpcClient {
}
}
}

pub async fn get_block_by_number(
&self,
number: BlockNumberOrTag,
full: bool,
) -> ClientResult<Block> {
Ok(self
.auth_client
.get_block_by_number(number, full)
.await
.set_code()?)
}
}

/// Generates Clap argument structs with a prefix to create a unique namespace when specifying RPC client config via the CLI.
Expand Down
280 changes: 280 additions & 0 deletions src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use alloy_rpc_types_eth::BlockNumberOrTag;
use tokio::{
task::JoinHandle,
time::{Instant, sleep_until},
};
use tracing::warn;

use crate::{Health, Probes, RpcClient};

pub struct HealthHandle {
pub probes: Arc<Probes>,
pub builder_client: Arc<RpcClient>,
pub health_check_interval: u64,
pub max_unsafe_interval: u64,
}

impl HealthHandle {
/// Periodically checks that the latest unsafe block timestamp is not older than the
/// the current time minus the max_unsafe_interval.
pub fn spawn(self) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let latest_unsafe = match self
.builder_client
.get_block_by_number(BlockNumberOrTag::Latest, false)
.await
{
Ok(block) => block,
Err(e) => {
warn!(target: "rollup_boost::health", "Failed to get unsafe block from builder client: {} - updating health status", e);
self.probes.set_health(Health::PartialContent);
sleep_until(
Instant::now() + Duration::from_secs(self.health_check_interval),
)
.await;
continue;
}
};

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

if now - latest_unsafe.header.timestamp > self.max_unsafe_interval {
warn!(target: "rollup_boost::health", "Unsafe block timestamp is too old ({} seconds - updating health status)", now - latest_unsafe.header.timestamp);
self.probes.set_health(Health::PartialContent);
} else {
self.probes.set_health(Health::Healthy);
}

sleep_until(Instant::now() + Duration::from_secs(self.health_check_interval)).await;
}
})
}
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;

use alloy_consensus::Header;
use alloy_rpc_types_eth::{Block, Header as EthHeader, Transaction};

use http::Uri;
use http_body_util::BodyExt;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use reth_rpc_layer::JwtSecret;
use serde_json::json;
use tokio::net::TcpListener;

use super::*;
use crate::{PayloadSource, Probes};

pub struct MockHttpServer {
addr: SocketAddr,
join_handle: JoinHandle<()>,
}

impl Drop for MockHttpServer {
fn drop(&mut self) {
self.join_handle.abort();
}
}

impl MockHttpServer {
async fn serve<S>(
f: fn(hyper::Request<hyper::body::Incoming>, timestamp: u64) -> S,
timestamp: u64,
) -> eyre::Result<Self>
where
S: Future<Output = Result<hyper::Response<String>, hyper::Error>>
+ Send
+ Sync
+ 'static,
{
{
let listener = TcpListener::bind("0.0.0.0:0").await?;
let addr = listener.local_addr()?;

let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let io = TokioIo::new(stream);
tokio::spawn(async move {
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
service_fn(move |req| f(req, timestamp)),
)
.await
{
eprintln!("Error serving connection: {}", err);
}
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
});

Ok(Self {
addr,
join_handle: handle,
})
}
}
}

async fn handler(
req: hyper::Request<hyper::body::Incoming>,
block_timstamp: u64,
) -> Result<hyper::Response<String>, hyper::Error> {
let body_bytes = match req.into_body().collect().await {
Ok(buf) => buf.to_bytes(),
Err(_) => {
let error_response = json!({
"jsonrpc": "2.0",
"error": { "code": -32700, "message": "Failed to read request body" },
"id": null
});
return Ok(hyper::Response::new(error_response.to_string()));
}
};

let request_body: serde_json::Value = match serde_json::from_slice(&body_bytes) {
Ok(json) => json,
Err(_) => {
let error_response = json!({
"jsonrpc": "2.0",
"error": { "code": -32700, "message": "Invalid JSON format" },
"id": null
});
return Ok(hyper::Response::new(error_response.to_string()));
}
};

let method = request_body["method"].as_str().unwrap_or_default();

let mock_block = Block::<Transaction> {
header: EthHeader {
inner: Header {
timestamp: block_timstamp,
..Default::default()
},
..Default::default()
},
..Default::default()
};

let response = match method {
"eth_getBlockByNumber" => json!({
"jsonrpc": "2.0",
"result": mock_block,
"id": request_body["id"]
}),
_ => {
let error_response = json!({
"jsonrpc": "2.0",
"error": { "code": -32601, "message": "Method not found" },
"id": request_body["id"]
});
return Ok(hyper::Response::new(error_response.to_string()));
}
};

Ok(hyper::Response::new(response.to_string()))
}

#[tokio::test]
async fn test_health_check_healthy() -> eyre::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let probes = Arc::new(Probes::default());
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();

let builder = MockHttpServer::serve(handler, now).await.unwrap();
let builder_client = Arc::new(RpcClient::new(
format!("http://{}", builder.addr).parse::<Uri>()?,
JwtSecret::random(),
100,
PayloadSource::Builder,
)?);

let health_handle = HealthHandle {
probes: probes.clone(),
builder_client: builder_client.clone(),
health_check_interval: 60,
max_unsafe_interval: 5,
};

let _ = health_handle.spawn();
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(matches!(probes.health(), Health::Healthy));
Ok(())
}

#[tokio::test]
async fn test_health_check_exceeds_max_unsafe_interval() -> eyre::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let probes = Arc::new(Probes::default());
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let builder = MockHttpServer::serve(handler, now - 10).await.unwrap();

let builder_client = Arc::new(RpcClient::new(
format!("http://{}", builder.addr).parse::<Uri>()?,
JwtSecret::random(),
100,
PayloadSource::Builder,
)?);

let health_handle = HealthHandle {
probes: probes.clone(),
builder_client: builder_client.clone(),
health_check_interval: 60,
max_unsafe_interval: 5,
};

let _ = health_handle.spawn();
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(matches!(probes.health(), Health::PartialContent));
Ok(())
}

#[tokio::test]
async fn test_health_check_service_unavailable() -> eyre::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let probes = Arc::new(Probes::default());
let builder_client = Arc::new(RpcClient::new(
"http://127.0.0.1:6000".parse::<Uri>()?,
JwtSecret::random(),
100,
PayloadSource::Builder,
)?);

let health_handle = HealthHandle {
probes: probes.clone(),
builder_client: builder_client.clone(),
health_check_interval: 60,
max_unsafe_interval: 5,
};

let _ = health_handle.spawn();
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(matches!(probes.health(), Health::PartialContent));
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ pub use tracing::*;

mod probe;
pub use probe::*;

mod health;
pub use health::*;
Loading
Loading