diff --git a/Cargo.lock b/Cargo.lock index 50dc06e5..4a688ec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3775,6 +3775,7 @@ dependencies = [ name = "rollup-boost" version = "0.1.0" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", diff --git a/Cargo.toml b/Cargo.toml index 7784c333..2df4d56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] op-alloy-rpc-types-engine = "0.12.0" alloy-rpc-types-engine = "0.13.0" +alloy-rpc-types-eth = "0.13.0" alloy-primitives = { version = "0.8.10", features = ["rand"] } tokio = { version = "1", features = ["full"] } tracing = "0.1.4" @@ -51,7 +52,7 @@ rand = "0.9.0" time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } op-alloy-consensus = "0.12.0" alloy-eips = { version = "0.13.0", features = ["serde"] } -alloy-rpc-types-eth = "0.13.0" +alloy-consensus = {version = "0.13.0", features = ["serde"] } anyhow = "1.0" testcontainers = { version = "0.23.3" } assert_cmd = "2.0.10" diff --git a/scripts/ci/stress.sh b/scripts/ci/stress.sh index d6cec2dc..9543ae33 100755 --- a/scripts/ci/stress.sh +++ b/scripts/ci/stress.sh @@ -32,8 +32,8 @@ run() { # the transactions will be included in the canonical blocks and finalized. # Figure out first the builder's JSON-RPC URL - ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-1-op-kurtosis rpc) - OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-1-op-reth-op-node-op-kurtosis rpc) + ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-2151908-1-op-kurtosis rpc) + OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-2151908-1-op-reth-op-node-op-kurtosis rpc) # Private key with prefunded balance PREFUNDED_PRIV_KEY=0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d diff --git a/src/cli.rs b/src/cli.rs index db9b62f3..d6d9d349 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -27,6 +27,14 @@ pub struct Args { #[clap(flatten)] pub l2_client: L2ClientArgs, + /// Duration in seconds between async health checks on the builder + #[arg(long, env, default_value = "60")] + pub health_check_interval: u64, + + /// Max duration in seconds between the unsafe head block of the builder and the current time + #[arg(long, env, default_value = "5")] + pub max_unsafe_interval: u64, + /// Disable using the proposer to sync the builder node #[arg(long, env, default_value = "false")] pub no_boost_sync: bool, @@ -164,6 +172,8 @@ impl Args { boost_sync_enabled, self.execution_mode, probes, + self.health_check_interval, + self.max_unsafe_interval, ); // Spawn the debug server diff --git a/src/client/rpc.rs b/src/client/rpc.rs index ea3a610d..4ca8106b 100644 --- a/src/client/rpc.rs +++ b/src/client/rpc.rs @@ -2,11 +2,13 @@ use crate::client::auth::AuthLayer; use crate::server::{ EngineApiClient, NewPayload, OpExecutionPayloadEnvelope, PayloadSource, Version, }; + use alloy_primitives::{B256, Bytes}; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtError, JwtSecret, PayloadId, PayloadStatus, }; +use alloy_rpc_types_eth::{Block, BlockNumberOrTag}; use clap::{Parser, arg}; use http::Uri; use jsonrpsee::http_client::transport::HttpBackend; @@ -322,6 +324,18 @@ impl RpcClient { } } } + + pub async fn get_block_by_number( + &self, + number: BlockNumberOrTag, + full: bool, + ) -> ClientResult { + Ok(self + .auth_client + .get_block_by_number(number, full) + .await + .set_code()?) + } } /// Generates Clap argument structs with a prefix to create a unique namespace when specifying RPC client config via the CLI. diff --git a/src/health.rs b/src/health.rs new file mode 100644 index 00000000..af12d107 --- /dev/null +++ b/src/health.rs @@ -0,0 +1,280 @@ +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use alloy_rpc_types_eth::BlockNumberOrTag; +use tokio::{ + task::JoinHandle, + time::{Instant, sleep_until}, +}; +use tracing::warn; + +use crate::{Health, Probes, RpcClient}; + +pub struct HealthHandle { + pub probes: Arc, + pub builder_client: Arc, + pub health_check_interval: u64, + pub max_unsafe_interval: u64, +} + +impl HealthHandle { + /// Periodically checks that the latest unsafe block timestamp is not older than the + /// the current time minus the max_unsafe_interval. + pub fn spawn(self) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + let latest_unsafe = match self + .builder_client + .get_block_by_number(BlockNumberOrTag::Latest, false) + .await + { + Ok(block) => block, + Err(e) => { + warn!(target: "rollup_boost::health", "Failed to get unsafe block from builder client: {} - updating health status", e); + self.probes.set_health(Health::PartialContent); + sleep_until( + Instant::now() + Duration::from_secs(self.health_check_interval), + ) + .await; + continue; + } + }; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + if now - latest_unsafe.header.timestamp > self.max_unsafe_interval { + warn!(target: "rollup_boost::health", "Unsafe block timestamp is too old ({} seconds - updating health status)", now - latest_unsafe.header.timestamp); + self.probes.set_health(Health::PartialContent); + } else { + self.probes.set_health(Health::Healthy); + } + + sleep_until(Instant::now() + Duration::from_secs(self.health_check_interval)).await; + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + use alloy_consensus::Header; + use alloy_rpc_types_eth::{Block, Header as EthHeader, Transaction}; + + use http::Uri; + use http_body_util::BodyExt; + use hyper::service::service_fn; + use hyper_util::rt::TokioIo; + use reth_rpc_layer::JwtSecret; + use serde_json::json; + use tokio::net::TcpListener; + + use super::*; + use crate::{PayloadSource, Probes}; + + pub struct MockHttpServer { + addr: SocketAddr, + join_handle: JoinHandle<()>, + } + + impl Drop for MockHttpServer { + fn drop(&mut self) { + self.join_handle.abort(); + } + } + + impl MockHttpServer { + async fn serve( + f: fn(hyper::Request, timestamp: u64) -> S, + timestamp: u64, + ) -> eyre::Result + where + S: Future, hyper::Error>> + + Send + + Sync + + 'static, + { + { + let listener = TcpListener::bind("0.0.0.0:0").await?; + let addr = listener.local_addr()?; + + let handle = tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, _)) => { + let io = TokioIo::new(stream); + tokio::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| f(req, timestamp)), + ) + .await + { + eprintln!("Error serving connection: {}", err); + } + }); + } + Err(e) => eprintln!("Error accepting connection: {}", e), + } + } + }); + + Ok(Self { + addr, + join_handle: handle, + }) + } + } + } + + async fn handler( + req: hyper::Request, + block_timstamp: u64, + ) -> Result, hyper::Error> { + let body_bytes = match req.into_body().collect().await { + Ok(buf) => buf.to_bytes(), + Err(_) => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32700, "message": "Failed to read request body" }, + "id": null + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + let request_body: serde_json::Value = match serde_json::from_slice(&body_bytes) { + Ok(json) => json, + Err(_) => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32700, "message": "Invalid JSON format" }, + "id": null + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + let method = request_body["method"].as_str().unwrap_or_default(); + + let mock_block = Block:: { + header: EthHeader { + inner: Header { + timestamp: block_timstamp, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; + + let response = match method { + "eth_getBlockByNumber" => json!({ + "jsonrpc": "2.0", + "result": mock_block, + "id": request_body["id"] + }), + _ => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32601, "message": "Method not found" }, + "id": request_body["id"] + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + Ok(hyper::Response::new(response.to_string())) + } + + #[tokio::test] + async fn test_health_check_healthy() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + let builder = MockHttpServer::serve(handler, now).await.unwrap(); + let builder_client = Arc::new(RpcClient::new( + format!("http://{}", builder.addr).parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::Healthy)); + Ok(()) + } + + #[tokio::test] + async fn test_health_check_exceeds_max_unsafe_interval() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let builder = MockHttpServer::serve(handler, now - 10).await.unwrap(); + + let builder_client = Arc::new(RpcClient::new( + format!("http://{}", builder.addr).parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::PartialContent)); + Ok(()) + } + + #[tokio::test] + async fn test_health_check_service_unavailable() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let builder_client = Arc::new(RpcClient::new( + "http://127.0.0.1:6000".parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::PartialContent)); + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 2cf2b9de..37e4ecfa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,3 +24,6 @@ pub use tracing::*; mod probe; pub use probe::*; + +mod health; +pub use health::*; diff --git a/src/server.rs b/src/server.rs index 91d3371f..22aba1b1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,9 @@ +use crate::HealthHandle; use crate::client::rpc::RpcClient; use crate::debug_api::DebugServer; use crate::probe::{Health, Probes}; use alloy_primitives::{B256, Bytes}; +use alloy_rpc_types_eth::{Block, BlockNumberOrTag}; use metrics::counter; use moka::sync::Cache; use opentelemetry::trace::SpanKind; @@ -21,7 +23,7 @@ use op_alloy_rpc_types_engine::{ OpPayloadAttributes, }; use serde::{Deserialize, Serialize}; - +use tokio::task::JoinHandle; use tracing::{debug, info, instrument}; use jsonrpsee::proc_macros::rpc; @@ -123,12 +125,12 @@ impl ExecutionMode { } } -#[derive(Clone)] pub struct RollupBoostServer { pub l2_client: Arc, pub builder_client: Arc, pub boost_sync: bool, pub payload_trace_context: Arc, + health_handle: JoinHandle<()>, execution_mode: Arc>, probes: Arc, } @@ -140,7 +142,17 @@ impl RollupBoostServer { boost_sync: bool, initial_execution_mode: ExecutionMode, probes: Arc, + health_check_interval: u64, + max_unsafe_interval: u64, ) -> Self { + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: Arc::new(builder_client.clone()), + health_check_interval, + max_unsafe_interval, + } + .spawn(); + Self { l2_client: Arc::new(l2_client), builder_client: Arc::new(builder_client), @@ -148,6 +160,7 @@ impl RollupBoostServer { payload_trace_context: Arc::new(PayloadTraceContext::new()), execution_mode: Arc::new(Mutex::new(initial_execution_mode)), probes, + health_handle, } } @@ -160,6 +173,10 @@ impl RollupBoostServer { pub fn execution_mode(&self) -> ExecutionMode { *self.execution_mode.lock() } + + pub fn health_handle(&self) -> &JoinHandle<()> { + &self.health_handle + } } impl TryInto> for RollupBoostServer { @@ -167,7 +184,7 @@ impl TryInto> for RollupBoostServer { fn try_into(self) -> Result, Self::Error> { let mut module: RpcModule<()> = RpcModule::new(()); - module.merge(EngineApiServer::into_rpc(self.clone()))?; + module.merge(EngineApiServer::into_rpc(self))?; for method in module.method_names() { info!(?method, "method registered"); @@ -203,22 +220,22 @@ impl PayloadSource { } } -#[rpc(server, client, namespace = "engine")] +#[rpc(server, client)] pub trait EngineApi { - #[method(name = "forkchoiceUpdatedV3")] + #[method(name = "engine_forkchoiceUpdatedV3")] async fn fork_choice_updated_v3( &self, fork_choice_state: ForkchoiceState, payload_attributes: Option, ) -> RpcResult; - #[method(name = "getPayloadV3")] + #[method(name = "engine_getPayloadV3")] async fn get_payload_v3( &self, payload_id: PayloadId, ) -> RpcResult; - #[method(name = "newPayloadV3")] + #[method(name = "engine_newPayloadV3")] async fn new_payload_v3( &self, payload: ExecutionPayloadV3, @@ -226,13 +243,13 @@ pub trait EngineApi { parent_beacon_block_root: B256, ) -> RpcResult; - #[method(name = "getPayloadV4")] + #[method(name = "engine_getPayloadV4")] async fn get_payload_v4( &self, payload_id: PayloadId, ) -> RpcResult; - #[method(name = "newPayloadV4")] + #[method(name = "engine_newPayloadV4")] async fn new_payload_v4( &self, payload: OpExecutionPayloadV4, @@ -240,6 +257,9 @@ pub trait EngineApi { parent_beacon_block_root: B256, execution_requests: Vec, ) -> RpcResult; + + #[method(name = "eth_getBlockByNumber")] + async fn get_block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult; } #[async_trait] @@ -410,6 +430,10 @@ impl EngineApiServer for RollupBoostServer { })) .await } + + async fn get_block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult { + Ok(self.l2_client.get_block_by_number(number, full).await?) + } } #[derive(Debug, Clone)] @@ -759,6 +783,8 @@ mod tests { boost_sync, ExecutionMode::Enabled, probes, + 60, + 5, ); let module: RpcModule<()> = rollup_boost.try_into().unwrap();