diff --git a/Cargo.lock b/Cargo.lock index 4d2e0076..4a688ec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1139,6 +1139,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.0" @@ -1500,6 +1510,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -1676,6 +1695,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2052,6 +2086,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.11" @@ -2809,6 +2859,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2952,12 +3019,50 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "openssl" +version = "0.10.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-sys" +version = "0.9.107" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.28.0" @@ -3581,26 +3686,34 @@ checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "http-body-util", "hyper", + "hyper-rustls", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", + "system-configuration", "tokio", + "tokio-native-tls", "tower 0.5.2", "tower-service", "url", @@ -3662,6 +3775,7 @@ dependencies = [ name = "rollup-boost" version = "0.1.0" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", @@ -3693,6 +3807,7 @@ dependencies = [ "paste", "predicates", "rand 0.9.0", + "reqwest", "reth-rpc-layer", "rustls", "serde", @@ -3844,7 +3959,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.2.0", ] [[package]] @@ -3868,7 +3983,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5467026f437b4cb2a533865eaa73eb840019a0916f4b9ec563c6e617e086c9" dependencies = [ - "core-foundation", + "core-foundation 0.10.0", "core-foundation-sys", "jni", "log", @@ -3877,7 +3992,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework", + "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs", "windows-sys 0.59.0", @@ -3970,6 +4085,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.2.0" @@ -3977,7 +4105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -4389,6 +4517,27 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -4587,6 +4736,16 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" @@ -4981,6 +5140,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index ce177d89..2df4d56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] op-alloy-rpc-types-engine = "0.12.0" alloy-rpc-types-engine = "0.13.0" +alloy-rpc-types-eth = "0.13.0" alloy-primitives = { version = "0.8.10", features = ["rand"] } tokio = { version = "1", features = ["full"] } tracing = "0.1.4" @@ -18,7 +19,10 @@ moka = { version = "0.12.10", features = ["sync"] } http = "1.1.0" dotenv = "0.15.0" tower = "0.4.13" -tower-http = { version = "0.5.2", features = ["decompression-full"] } +tower-http = { version = "0.5.2", features = [ + "decompression-full", + "sensitive-headers", +] } http-body-util = "0.1.2" hyper = { version = "1.4.1", features = ["full"] } hyper-util = { version = "0.1", features = ["full"] } @@ -48,7 +52,7 @@ rand = "0.9.0" time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } op-alloy-consensus = "0.12.0" alloy-eips = { version = "0.13.0", features = ["serde"] } -alloy-rpc-types-eth = "0.13.0" +alloy-consensus = {version = "0.13.0", features = ["serde"] } anyhow = "1.0" testcontainers = { version = "0.23.3" } assert_cmd = "2.0.10" @@ -57,6 +61,7 @@ tokio-util = { version = "0.7.13" } bytes = "1.2" reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", rev = "v1.3.7" } ctor = "0.4.1" +reqwest = "0.12.15" [[bin]] name = "rollup-boost" diff --git a/docs/running-rollup-boost.md b/docs/running-rollup-boost.md index 83398b8d..a540c8ba 100644 --- a/docs/running-rollup-boost.md +++ b/docs/running-rollup-boost.md @@ -28,6 +28,20 @@ While this does not ensure high availability for the builder, the chain will hav ![rollup-boost-op-conductor](../assets/rollup-boost-op-conductor.png) +### Health Checks + +`rollup-boost` supports the standard array of kubernetes probes: + +- `/healthz` Returns various status codes to communicate `rollup-boost` health + - 200 OK - The builder is producing blocks + - 206 Partial Content - The l2 is producing blocks, but the builder is not + - 503 Service Unavailable - Neither the l2 or the builder is producing blocks +`op-conductor` should eventually be able to use this signal to switch to a different sequencer in an HA sequencer setup. In a future upgrade to `op-conductor`, A sequencer leader with a healthy (200 OK) EL (`rollup-boost` in our case) could be selected preferentially over one with an unhealthy (206 or 503) EL. If no ELs are healthy, then we can fallback to an EL which is responding with `206 Partial Content`. + +- `/readyz` Used by kubernetes to determine if the service is ready to accept traffic. Should always respond with `200 OK` + +- `/livez` determines wether or not `rollup-boost` is live (running and not deadlocked) and responding to requests. If `rollup-boost` fails to respond, kubernetes can use this as a signal to restart the pod. Should always respond with `200 OK` + ## Observability To check if the rollup-boost server is running, you can check the health endpoint: diff --git a/scripts/ci/kurtosis-params.yaml b/scripts/ci/kurtosis-params.yaml index 6a0678b3..873b8991 100644 --- a/scripts/ci/kurtosis-params.yaml +++ b/scripts/ci/kurtosis-params.yaml @@ -14,7 +14,6 @@ optimism_package: fjord_time_offset: 0 granite_time_offset: 0 isthmus_time_offset: 5 - fund_dev_accounts: true mev_params: rollup_boost_image: "flashbots/rollup-boost:develop" additional_services: diff --git a/scripts/ci/stress.sh b/scripts/ci/stress.sh index d6cec2dc..9543ae33 100755 --- a/scripts/ci/stress.sh +++ b/scripts/ci/stress.sh @@ -32,8 +32,8 @@ run() { # the transactions will be included in the canonical blocks and finalized. # Figure out first the builder's JSON-RPC URL - ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-1-op-kurtosis rpc) - OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-1-op-reth-op-node-op-kurtosis rpc) + ROLLUP_BOOST_SOCKET=$(kurtosis port print op-rollup-boost op-rollup-boost-2151908-1-op-kurtosis rpc) + OP_RETH_BUILDER_SOCKET=$(kurtosis port print op-rollup-boost op-el-builder-2151908-1-op-reth-op-node-op-kurtosis rpc) # Private key with prefunded balance PREFUNDED_PRIV_KEY=0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d diff --git a/src/cli.rs b/src/cli.rs index 54291fc1..d6d9d349 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,6 +11,7 @@ use crate::{ DebugClient, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient, client::rpc::{BuilderArgs, L2ClientArgs}, init_metrics, init_tracing, + probe::ProbeLayer, server::ExecutionMode, }; @@ -26,6 +27,14 @@ pub struct Args { #[clap(flatten)] pub l2_client: L2ClientArgs, + /// Duration in seconds between async health checks on the builder + #[arg(long, env, default_value = "60")] + pub health_check_interval: u64, + + /// Max duration in seconds between the unsafe head block of the builder and the current time + #[arg(long, env, default_value = "5")] + pub max_unsafe_interval: u64, + /// Disable using the proposer to sync the builder node #[arg(long, env, default_value = "false")] pub no_boost_sync: bool, @@ -155,11 +164,16 @@ impl Args { info!("Boost sync enabled"); } + let (probe_layer, probes) = ProbeLayer::new(); + let rollup_boost = RollupBoostServer::new( l2_client, builder_client, boost_sync_enabled, self.execution_mode, + probes, + self.health_check_interval, + self.max_unsafe_interval, ); // Spawn the debug server @@ -170,12 +184,15 @@ impl Args { // Build and start the server info!("Starting server on :{}", self.rpc_port); - let http_middleware = tower::ServiceBuilder::new().layer(ProxyLayer::new( - l2_client_args.l2_url, - l2_auth_jwt, - builder_args.builder_url, - builder_auth_jwt, - )); + let http_middleware = + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(ProxyLayer::new( + l2_client_args.l2_url, + l2_auth_jwt, + builder_args.builder_url, + builder_auth_jwt, + )); let server = Server::builder() .set_http_middleware(http_middleware) diff --git a/src/client/auth.rs b/src/client/auth.rs index 86723fc4..4446fd60 100644 --- a/src/client/auth.rs +++ b/src/client/auth.rs @@ -2,46 +2,51 @@ use alloy_rpc_types_engine::{Claims, JwtSecret}; use http::{HeaderValue, header::AUTHORIZATION}; use std::{ + iter::once, task::{Context, Poll}, time::{Duration, SystemTime, UNIX_EPOCH}, }; use tower::{Layer, Service}; +use tower_http::sensitive_headers::{SetSensitiveRequestHeaders, SetSensitiveRequestHeadersLayer}; + +pub type Auth = AuthService>; /// A layer that adds a new JWT token to every request using `AuthClientService`. #[derive(Clone, Debug)] -pub struct AuthClientLayer { +pub struct AuthLayer { secret: JwtSecret, } -impl AuthClientLayer { +impl AuthLayer { /// Create a new `AuthClientLayer` with the given `secret`. pub const fn new(secret: JwtSecret) -> Self { Self { secret } } } -impl Layer for AuthClientLayer { - type Service = AuthClientService; +impl Layer for AuthLayer { + type Service = AuthService>; fn layer(&self, inner: S) -> Self::Service { - AuthClientService::new(self.secret, inner) + let inner = SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION)).layer(inner); + AuthService::new(self.secret, inner) } } /// Automatically authenticates every client request with the given `secret`. #[derive(Debug, Clone)] -pub struct AuthClientService { +pub struct AuthService { secret: JwtSecret, inner: S, } -impl AuthClientService { +impl AuthService { const fn new(secret: JwtSecret, inner: S) -> Self { Self { secret, inner } } } -impl Service> for AuthClientService +impl Service> for AuthService where S: Service>, B: std::fmt::Debug, diff --git a/src/client/http.rs b/src/client/http.rs index 81fe4844..2d8118bd 100644 --- a/src/client/http.rs +++ b/src/client/http.rs @@ -1,3 +1,4 @@ +use crate::client::auth::AuthLayer; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; use http::Uri; @@ -13,11 +14,13 @@ use tower::{Service as _, ServiceBuilder, ServiceExt}; use tower_http::decompression::{Decompression, DecompressionLayer}; use tracing::{debug, error, instrument}; -use super::auth::{AuthClientLayer, AuthClientService}; +use super::auth::Auth; + +pub type HttpClientService = Decompression, HttpBody>>>; #[derive(Clone, Debug)] pub struct HttpClient { - client: Decompression, HttpBody>>>, + client: HttpClientService, url: Uri, target: PayloadSource, } @@ -36,7 +39,7 @@ impl HttpClient { let client = ServiceBuilder::new() .layer(DecompressionLayer::new()) - .layer(AuthClientLayer::new(secret)) + .layer(AuthLayer::new(secret)) .service(client); Self { diff --git a/src/client/rpc.rs b/src/client/rpc.rs index 37a7a786..4ca8106b 100644 --- a/src/client/rpc.rs +++ b/src/client/rpc.rs @@ -1,12 +1,14 @@ -use crate::client::auth::{AuthClientLayer, AuthClientService}; +use crate::client::auth::AuthLayer; use crate::server::{ EngineApiClient, NewPayload, OpExecutionPayloadEnvelope, PayloadSource, Version, }; + use alloy_primitives::{B256, Bytes}; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtError, JwtSecret, PayloadId, PayloadStatus, }; +use alloy_rpc_types_eth::{Block, BlockNumberOrTag}; use clap::{Parser, arg}; use http::Uri; use jsonrpsee::http_client::transport::HttpBackend; @@ -23,6 +25,10 @@ use std::time::Duration; use thiserror::Error; use tracing::{error, info, instrument}; +use super::auth::Auth; + +pub type RpcClientService = HttpClient>; + const INTERNAL_ERROR: i32 = 13; pub(crate) type ClientResult = Result; @@ -96,7 +102,7 @@ impl From for ErrorObjectOwned { #[derive(Clone)] pub struct RpcClient { /// Handles requests to the authenticated Engine API (requires JWT authentication) - auth_client: HttpClient>, + auth_client: RpcClientService, /// Uri of the RPC server for authenticated Engine API calls auth_rpc: Uri, /// The source of the payload @@ -111,7 +117,7 @@ impl RpcClient { timeout: u64, payload_source: PayloadSource, ) -> Result { - let auth_layer = AuthClientLayer::new(auth_rpc_jwt_secret); + let auth_layer = AuthLayer::new(auth_rpc_jwt_secret); let auth_client = HttpClientBuilder::new() .set_http_middleware(tower::ServiceBuilder::new().layer(auth_layer)) .request_timeout(Duration::from_millis(timeout)) @@ -318,6 +324,18 @@ impl RpcClient { } } } + + pub async fn get_block_by_number( + &self, + number: BlockNumberOrTag, + full: bool, + ) -> ClientResult { + Ok(self + .auth_client + .get_block_by_number(number, full) + .await + .set_code()?) + } } /// Generates Clap argument structs with a prefix to create a unique namespace when specifying RPC client config via the CLI. @@ -357,13 +375,10 @@ pub mod tests { use jsonrpsee::core::client::ClientT; use parking_lot::Mutex; - use crate::client::auth::AuthClientService; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; use jsonrpsee::RpcModule; - use jsonrpsee::http_client::HttpClient; use jsonrpsee::http_client::transport::Error as TransportError; - use jsonrpsee::http_client::transport::HttpBackend; use jsonrpsee::{ core::ClientError, rpc_params, @@ -430,10 +445,7 @@ pub mod tests { )); } - async fn send_request( - client: HttpClient>, - port: u16, - ) -> Result { + async fn send_request(client: RpcClientService, port: u16) -> Result { let server = spawn_server(port).await; let response = client diff --git a/src/health.rs b/src/health.rs index 9615c15c..af12d107 100644 --- a/src/health.rs +++ b/src/health.rs @@ -1,67 +1,280 @@ use std::{ - pin::Pin, - task::{Context, Poll}, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, }; -use futures::FutureExt as _; -use jsonrpsee::{ - core::BoxError, - http_client::{HttpBody, HttpRequest, HttpResponse}, +use alloy_rpc_types_eth::BlockNumberOrTag; +use tokio::{ + task::JoinHandle, + time::{Instant, sleep_until}, }; -use tower::{Layer, Service, util::Either}; +use tracing::warn; -/// A [`Layer`] that filters out /healthz requests and responds with a 200 OK. -#[derive(Clone, Debug)] -pub struct HealthLayer; +use crate::{Health, Probes, RpcClient}; -impl Layer for HealthLayer { - type Service = HealthService; +pub struct HealthHandle { + pub probes: Arc, + pub builder_client: Arc, + pub health_check_interval: u64, + pub max_unsafe_interval: u64, +} + +impl HealthHandle { + /// Periodically checks that the latest unsafe block timestamp is not older than the + /// the current time minus the max_unsafe_interval. + pub fn spawn(self) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + let latest_unsafe = match self + .builder_client + .get_block_by_number(BlockNumberOrTag::Latest, false) + .await + { + Ok(block) => block, + Err(e) => { + warn!(target: "rollup_boost::health", "Failed to get unsafe block from builder client: {} - updating health status", e); + self.probes.set_health(Health::PartialContent); + sleep_until( + Instant::now() + Duration::from_secs(self.health_check_interval), + ) + .await; + continue; + } + }; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); - fn layer(&self, inner: S) -> Self::Service { - HealthService { inner } + if now - latest_unsafe.header.timestamp > self.max_unsafe_interval { + warn!(target: "rollup_boost::health", "Unsafe block timestamp is too old ({} seconds - updating health status)", now - latest_unsafe.header.timestamp); + self.probes.set_health(Health::PartialContent); + } else { + self.probes.set_health(Health::Healthy); + } + + sleep_until(Instant::now() + Duration::from_secs(self.health_check_interval)).await; + } + }) } } -#[derive(Clone, Debug)] -pub struct HealthService { - inner: S, -} +#[cfg(test)] +mod tests { + use std::net::SocketAddr; -impl Service> for HealthService -where - S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, - S::Response: 'static, - S::Error: Into + 'static, - S::Future: Send + 'static, -{ - type Response = HttpResponse; - type Error = BoxError; - type Future = Either< - Pin> + Send + 'static>>, - S::Future, - >; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx).map_err(Into::into) + use alloy_consensus::Header; + use alloy_rpc_types_eth::{Block, Header as EthHeader, Transaction}; + + use http::Uri; + use http_body_util::BodyExt; + use hyper::service::service_fn; + use hyper_util::rt::TokioIo; + use reth_rpc_layer::JwtSecret; + use serde_json::json; + use tokio::net::TcpListener; + + use super::*; + use crate::{PayloadSource, Probes}; + + pub struct MockHttpServer { + addr: SocketAddr, + join_handle: JoinHandle<()>, } - fn call(&mut self, request: HttpRequest) -> Self::Future { - if request.uri().path() == "/healthz" { - Either::A(Self::healthz().boxed()) - } else { - Either::B(self.inner.call(request)) + impl Drop for MockHttpServer { + fn drop(&mut self) { + self.join_handle.abort(); } } -} -impl HealthService -where - S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, - S::Response: 'static, - S::Error: Into + 'static, - S::Future: Send + 'static, -{ - async fn healthz() -> Result { - Ok(HttpResponse::new(HttpBody::from("OK"))) + impl MockHttpServer { + async fn serve( + f: fn(hyper::Request, timestamp: u64) -> S, + timestamp: u64, + ) -> eyre::Result + where + S: Future, hyper::Error>> + + Send + + Sync + + 'static, + { + { + let listener = TcpListener::bind("0.0.0.0:0").await?; + let addr = listener.local_addr()?; + + let handle = tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, _)) => { + let io = TokioIo::new(stream); + tokio::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| f(req, timestamp)), + ) + .await + { + eprintln!("Error serving connection: {}", err); + } + }); + } + Err(e) => eprintln!("Error accepting connection: {}", e), + } + } + }); + + Ok(Self { + addr, + join_handle: handle, + }) + } + } + } + + async fn handler( + req: hyper::Request, + block_timstamp: u64, + ) -> Result, hyper::Error> { + let body_bytes = match req.into_body().collect().await { + Ok(buf) => buf.to_bytes(), + Err(_) => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32700, "message": "Failed to read request body" }, + "id": null + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + let request_body: serde_json::Value = match serde_json::from_slice(&body_bytes) { + Ok(json) => json, + Err(_) => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32700, "message": "Invalid JSON format" }, + "id": null + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + let method = request_body["method"].as_str().unwrap_or_default(); + + let mock_block = Block:: { + header: EthHeader { + inner: Header { + timestamp: block_timstamp, + ..Default::default() + }, + ..Default::default() + }, + ..Default::default() + }; + + let response = match method { + "eth_getBlockByNumber" => json!({ + "jsonrpc": "2.0", + "result": mock_block, + "id": request_body["id"] + }), + _ => { + let error_response = json!({ + "jsonrpc": "2.0", + "error": { "code": -32601, "message": "Method not found" }, + "id": request_body["id"] + }); + return Ok(hyper::Response::new(error_response.to_string())); + } + }; + + Ok(hyper::Response::new(response.to_string())) + } + + #[tokio::test] + async fn test_health_check_healthy() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + let builder = MockHttpServer::serve(handler, now).await.unwrap(); + let builder_client = Arc::new(RpcClient::new( + format!("http://{}", builder.addr).parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::Healthy)); + Ok(()) + } + + #[tokio::test] + async fn test_health_check_exceeds_max_unsafe_interval() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let builder = MockHttpServer::serve(handler, now - 10).await.unwrap(); + + let builder_client = Arc::new(RpcClient::new( + format!("http://{}", builder.addr).parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::PartialContent)); + Ok(()) + } + + #[tokio::test] + async fn test_health_check_service_unavailable() -> eyre::Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let probes = Arc::new(Probes::default()); + let builder_client = Arc::new(RpcClient::new( + "http://127.0.0.1:6000".parse::()?, + JwtSecret::random(), + 100, + PayloadSource::Builder, + )?); + + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: builder_client.clone(), + health_check_interval: 60, + max_unsafe_interval: 5, + }; + + let _ = health_handle.spawn(); + tokio::time::sleep(Duration::from_secs(2)).await; + assert!(matches!(probes.health(), Health::PartialContent)); + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 0d131b74..37e4ecfa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,9 +10,6 @@ pub use cli::*; mod debug_api; pub use debug_api::*; -mod health; -pub use health::{HealthLayer, HealthService}; - mod metrics; pub use metrics::*; @@ -24,3 +21,9 @@ pub use server::*; mod tracing; pub use tracing::*; + +mod probe; +pub use probe::*; + +mod health; +pub use health::*; diff --git a/src/probe.rs b/src/probe.rs new file mode 100644 index 00000000..793ae10f --- /dev/null +++ b/src/probe.rs @@ -0,0 +1,145 @@ +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::FutureExt as _; +use jsonrpsee::{ + core::BoxError, + http_client::{HttpBody, HttpRequest, HttpResponse}, +}; +use parking_lot::Mutex; +use tower::{Layer, Service}; + +#[derive(Copy, Clone, Debug, Default)] +pub enum Health { + /// Indicates that the builder is building blocks + #[default] + Healthy, + /// Indicates that the l2 is building blocks, but the builder is not + PartialContent, + /// Indicates that blocks are not being built by either the l2 or the builder + /// + /// Service starts out unavailable until the first blocks are built + ServiceUnavailable, +} + +impl From for HttpResponse { + fn from(health: Health) -> Self { + match health { + Health::Healthy => ok(), + Health::PartialContent => partial_content(), + Health::ServiceUnavailable => service_unavailable(), + } + } +} + +#[derive(Debug, Default)] +pub struct Probes { + health: Mutex, +} + +impl Probes { + pub fn set_health(&self, value: Health) { + *self.health.lock() = value; + } + + pub fn health(&self) -> Health { + *self.health.lock() + } +} + +/// A [`Layer`] that adds probe endpoints to a service. +#[derive(Clone, Debug)] +pub struct ProbeLayer { + probes: Arc, +} + +impl ProbeLayer { + pub(crate) fn new() -> (Self, Arc) { + let probes = Arc::new(Probes::default()); + ( + Self { + probes: probes.clone(), + }, + probes, + ) + } +} + +impl Layer for ProbeLayer { + type Service = ProbeService; + + fn layer(&self, inner: S) -> Self::Service { + ProbeService { + inner, + probes: self.probes.clone(), + } + } +} + +#[derive(Clone, Debug)] +pub struct ProbeService { + inner: S, + probes: Arc, +} + +impl Service> for ProbeService +where + S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = BoxError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, request: HttpRequest) -> Self::Future { + // See https://github.com/tower-rs/tower/blob/abb375d08cf0ba34c1fe76f66f1aba3dc4341013/tower-service/src/lib.rs#L276 + // for an explanation of this pattern + let mut service = self.clone(); + service.inner = std::mem::replace(&mut self.inner, service.inner); + + async move { + match request.uri().path() { + // Return health status + "/healthz" => Ok(service.probes.health().into()), + // Service is responding, and therefor ready + "/readyz" => Ok(ok()), + // Service is responding, and therefor live + "/livez" => Ok(ok()), + // Forward the request to the inner service + _ => service.inner.call(request).await.map_err(|e| e.into()), + } + } + .boxed() + } +} + +fn ok() -> HttpResponse { + HttpResponse::builder() + .status(200) + .body(HttpBody::from("OK")) + .unwrap() +} + +fn partial_content() -> HttpResponse { + HttpResponse::builder() + .status(206) + .body(HttpBody::from("Partial Content")) + .unwrap() +} + +fn service_unavailable() -> HttpResponse { + HttpResponse::builder() + .status(503) + .body(HttpBody::from("Service Unavailable")) + .unwrap() +} diff --git a/src/proxy.rs b/src/proxy.rs index d0bfcd40..7daff475 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,6 +1,4 @@ use crate::client::http::HttpClient; -use crate::health::HealthLayer; -use crate::health::HealthService; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; use http::Uri; @@ -9,7 +7,7 @@ use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; use tower::{Layer, Service}; -use tracing::info; +use tracing::debug; const MULTIPLEX_METHODS: [&str; 4] = [ "engine_", @@ -17,6 +15,7 @@ const MULTIPLEX_METHODS: [&str; 4] = [ "eth_sendRawTransaction", "miner_", ]; + const FORWARD_REQUESTS: [&str; 6] = [ "eth_sendRawTransaction", "eth_sendRawTransactionConditional", @@ -51,7 +50,7 @@ impl ProxyLayer { } impl Layer for ProxyLayer { - type Service = HealthService>; + type Service = ProxyService; fn layer(&self, inner: S) -> Self::Service { let l2_client = HttpClient::new( @@ -66,13 +65,11 @@ impl Layer for ProxyLayer { PayloadSource::Builder, ); - let proxy = ProxyService { + ProxyService { inner, l2_client, builder_client, - }; - - HealthLayer.layer(proxy) + } } } @@ -132,11 +129,10 @@ where }); let l2_req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes)); - info!(target: "proxy::call", message = "forward request to rollup-boost server", ?method); service.l2_client.forward(l2_req, method).await } else { let req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes)); - info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); + debug!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); service.inner.call(req).await.map_err(|e| e.into()) } } else { @@ -150,23 +146,26 @@ where #[cfg(test)] mod tests { + use crate::probe::ProbeLayer; + use super::*; use alloy_primitives::{B256, Bytes, U64, U128, hex}; use alloy_rpc_types_engine::JwtSecret; use alloy_rpc_types_eth::erc4337::TransactionConditional; + use http::StatusCode; use http_body_util::BodyExt; use hyper::service::service_fn; use hyper_util::client::legacy::Client; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee::server::Server; + use jsonrpsee::types::{ErrorCode, ErrorObject}; use jsonrpsee::{ RpcModule, core::{ClientError, client::ClientT}, http_client::HttpClient, rpc_params, server::{ServerBuilder, ServerHandle}, - types::{ErrorCode, ErrorObject}, }; use serde_json::json; use std::{ @@ -400,16 +399,8 @@ mod tests { let health_check_url = format!("http://{ADDR}:{PORT}/healthz"); let health_response = client.get(health_check_url.parse::().unwrap()).await; assert!(health_response.is_ok()); - let b = health_response - .unwrap() - .into_body() - .collect() - .await - .unwrap() - .to_bytes(); - // Convert the collected bytes to a string - let body_string = String::from_utf8(b.to_vec()).unwrap(); - assert_eq!(body_string, "OK"); + let status = health_response.unwrap().status(); + assert_eq!(status, StatusCode::OK); proxy_server.stop().unwrap(); proxy_server.stopped().await; @@ -465,11 +456,17 @@ mod tests { .parse::() .unwrap(); + let (probe_layer, _probes) = ProbeLayer::new(); + let proxy_layer = ProxyLayer::new(l2_auth_uri.clone(), jwt, l2_auth_uri, jwt); // Create a layered server let server = ServerBuilder::default() - .set_http_middleware(tower::ServiceBuilder::new().layer(proxy_layer)) + .set_http_middleware( + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(proxy_layer), + ) .build(addr.parse::().unwrap()) .await .unwrap(); diff --git a/src/server.rs b/src/server.rs index 59d83cd2..22aba1b1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,9 @@ +use crate::HealthHandle; use crate::client::rpc::RpcClient; use crate::debug_api::DebugServer; +use crate::probe::{Health, Probes}; use alloy_primitives::{B256, Bytes}; +use alloy_rpc_types_eth::{Block, BlockNumberOrTag}; use metrics::counter; use moka::sync::Cache; use opentelemetry::trace::SpanKind; @@ -20,7 +23,7 @@ use op_alloy_rpc_types_engine::{ OpPayloadAttributes, }; use serde::{Deserialize, Serialize}; - +use tokio::task::JoinHandle; use tracing::{debug, info, instrument}; use jsonrpsee::proc_macros::rpc; @@ -122,13 +125,14 @@ impl ExecutionMode { } } -#[derive(Clone)] pub struct RollupBoostServer { pub l2_client: Arc, pub builder_client: Arc, pub boost_sync: bool, pub payload_trace_context: Arc, + health_handle: JoinHandle<()>, execution_mode: Arc>, + probes: Arc, } impl RollupBoostServer { @@ -137,13 +141,26 @@ impl RollupBoostServer { builder_client: RpcClient, boost_sync: bool, initial_execution_mode: ExecutionMode, + probes: Arc, + health_check_interval: u64, + max_unsafe_interval: u64, ) -> Self { + let health_handle = HealthHandle { + probes: probes.clone(), + builder_client: Arc::new(builder_client.clone()), + health_check_interval, + max_unsafe_interval, + } + .spawn(); + Self { l2_client: Arc::new(l2_client), builder_client: Arc::new(builder_client), boost_sync, payload_trace_context: Arc::new(PayloadTraceContext::new()), execution_mode: Arc::new(Mutex::new(initial_execution_mode)), + probes, + health_handle, } } @@ -156,6 +173,10 @@ impl RollupBoostServer { pub fn execution_mode(&self) -> ExecutionMode { *self.execution_mode.lock() } + + pub fn health_handle(&self) -> &JoinHandle<()> { + &self.health_handle + } } impl TryInto> for RollupBoostServer { @@ -163,7 +184,7 @@ impl TryInto> for RollupBoostServer { fn try_into(self) -> Result, Self::Error> { let mut module: RpcModule<()> = RpcModule::new(()); - module.merge(EngineApiServer::into_rpc(self.clone()))?; + module.merge(EngineApiServer::into_rpc(self))?; for method in module.method_names() { info!(?method, "method registered"); @@ -199,22 +220,22 @@ impl PayloadSource { } } -#[rpc(server, client, namespace = "engine")] +#[rpc(server, client)] pub trait EngineApi { - #[method(name = "forkchoiceUpdatedV3")] + #[method(name = "engine_forkchoiceUpdatedV3")] async fn fork_choice_updated_v3( &self, fork_choice_state: ForkchoiceState, payload_attributes: Option, ) -> RpcResult; - #[method(name = "getPayloadV3")] + #[method(name = "engine_getPayloadV3")] async fn get_payload_v3( &self, payload_id: PayloadId, ) -> RpcResult; - #[method(name = "newPayloadV3")] + #[method(name = "engine_newPayloadV3")] async fn new_payload_v3( &self, payload: ExecutionPayloadV3, @@ -222,13 +243,13 @@ pub trait EngineApi { parent_beacon_block_root: B256, ) -> RpcResult; - #[method(name = "getPayloadV4")] + #[method(name = "engine_getPayloadV4")] async fn get_payload_v4( &self, payload_id: PayloadId, ) -> RpcResult; - #[method(name = "newPayloadV4")] + #[method(name = "engine_newPayloadV4")] async fn new_payload_v4( &self, payload: OpExecutionPayloadV4, @@ -236,6 +257,9 @@ pub trait EngineApi { parent_beacon_block_root: B256, execution_requests: Vec, ) -> RpcResult; + + #[method(name = "eth_getBlockByNumber")] + async fn get_block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult; } #[async_trait] @@ -406,6 +430,10 @@ impl EngineApiServer for RollupBoostServer { })) .await } + + async fn get_block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult { + Ok(self.l2_client.get_block_by_number(number, full).await?) + } } #[derive(Debug, Clone)] @@ -583,6 +611,8 @@ impl RollupBoostServer { let (l2_payload, builder_payload) = tokio::join!(l2_client_future, builder_client_future); let (payload, context) = match (builder_payload, l2_payload) { (Ok(Some(builder)), Ok(l2_payload)) => { + // builder successfully returned a payload + self.probes.set_health(Health::Healthy); if self.execution_mode().is_fallback_enabled() { // Default to op-geth's payload Ok((l2_payload, PayloadSource::L2)) @@ -590,8 +620,16 @@ impl RollupBoostServer { Ok((builder, PayloadSource::Builder)) } } - (_, Ok(l2)) => Ok((l2, PayloadSource::L2)), - (_, Err(e)) => Err(e), + (_, Ok(l2)) => { + // builder failed to return a payload + self.probes.set_health(Health::PartialContent); + Ok((l2, PayloadSource::L2)) + } + (_, Err(e)) => { + // builder and l2 failed to return a payload + self.probes.set_health(Health::ServiceUnavailable); + Err(e) + } }?; tracing::Span::current().record("payload_source", context.to_string()); @@ -620,18 +658,20 @@ impl RollupBoostServer { #[cfg(test)] #[allow(clippy::complexity)] mod tests { + use crate::probe::ProbeLayer; + use crate::proxy::ProxyLayer; + use super::*; use alloy_primitives::hex; use alloy_primitives::{FixedBytes, U256}; + use alloy_rpc_types_engine::JwtSecret; use alloy_rpc_types_engine::{ BlobsBundleV1, ExecutionPayloadV1, ExecutionPayloadV2, PayloadStatusEnum, }; - - use alloy_rpc_types_engine::JwtSecret; - use http::Uri; + use http::{StatusCode, Uri}; use jsonrpsee::RpcModule; use jsonrpsee::http_client::HttpClient; - use jsonrpsee::server::{ServerBuilder, ServerHandle}; + use jsonrpsee::server::{Server, ServerBuilder, ServerHandle}; use parking_lot::Mutex; use std::net::SocketAddr; use std::str::FromStr; @@ -708,8 +748,9 @@ mod tests { l2_mock: MockEngineServer, builder_server: ServerHandle, builder_mock: MockEngineServer, - proxy_server: ServerHandle, - client: HttpClient, + server: ServerHandle, + rpc_client: HttpClient, + http_client: reqwest::Client, } impl TestHarness { @@ -722,50 +763,84 @@ mod tests { let l2_auth_rpc = Uri::from_str(&format!("http://{}:{}", HOST, L2_PORT)).unwrap(); let l2_client = - RpcClient::new(l2_auth_rpc, jwt_secret, 2000, PayloadSource::L2).unwrap(); + RpcClient::new(l2_auth_rpc.clone(), jwt_secret, 2000, PayloadSource::L2).unwrap(); let builder_auth_rpc = Uri::from_str(&format!("http://{}:{}", HOST, BUILDER_PORT)).unwrap(); - let builder_client = - RpcClient::new(builder_auth_rpc, jwt_secret, 2000, PayloadSource::Builder).unwrap(); + let builder_client = RpcClient::new( + builder_auth_rpc.clone(), + jwt_secret, + 2000, + PayloadSource::Builder, + ) + .unwrap(); + + let (probe_layer, probes) = ProbeLayer::new(); - let rollup_boost_client = RollupBoostServer::new( + let rollup_boost = RollupBoostServer::new( l2_client, builder_client, boost_sync, ExecutionMode::Enabled, + probes, + 60, + 5, ); - let module: RpcModule<()> = rollup_boost_client.try_into().unwrap(); + let module: RpcModule<()> = rollup_boost.try_into().unwrap(); - let proxy_server = ServerBuilder::default() + let http_middleware = + tower::ServiceBuilder::new() + .layer(probe_layer) + .layer(ProxyLayer::new( + l2_auth_rpc, + jwt_secret, + builder_auth_rpc, + jwt_secret, + )); + + let server = Server::builder() + .set_http_middleware(http_middleware) .build("0.0.0.0:8556".parse::().unwrap()) .await .unwrap() .start(module); + let l2_mock = l2_mock.unwrap_or(MockEngineServer::new()); let builder_mock = builder_mock.unwrap_or(MockEngineServer::new()); let l2_server = spawn_server(l2_mock.clone(), L2_ADDR).await; let builder_server = spawn_server(builder_mock.clone(), BUILDER_ADDR).await; + let rpc_client = HttpClient::builder() + .build(format!("http://{SERVER_ADDR}")) + .unwrap(); + let http_client = reqwest::Client::new(); + TestHarness { l2_server, l2_mock, builder_server, builder_mock, - proxy_server, - client: HttpClient::builder() - .build(format!("http://{SERVER_ADDR}")) - .unwrap(), + server, + rpc_client, + http_client, } } + async fn get(&self, path: &str) -> reqwest::Response { + self.http_client + .get(format!("http://{}/{}", SERVER_ADDR, path)) + .send() + .await + .unwrap() + } + async fn cleanup(self) { self.l2_server.stop().unwrap(); self.l2_server.stopped().await; self.builder_server.stop().unwrap(); self.builder_server.stopped().await; - self.proxy_server.stop().unwrap(); - self.proxy_server.stopped().await; + self.server.stop().unwrap(); + self.server.stopped().await; } } @@ -780,13 +855,20 @@ mod tests { async fn engine_success() { let test_harness = TestHarness::new(false, None, None).await; + // Since no blocks have been created, the service should be unavailable + let health = test_harness.get("healthz").await; + assert_eq!(health.status(), StatusCode::OK); + // test fork_choice_updated_v3 success let fcu = ForkchoiceState { head_block_hash: FixedBytes::random(), safe_block_hash: FixedBytes::random(), finalized_block_hash: FixedBytes::random(), }; - let fcu_response = test_harness.client.fork_choice_updated_v3(fcu, None).await; + let fcu_response = test_harness + .rpc_client + .fork_choice_updated_v3(fcu, None) + .await; assert!(fcu_response.is_ok()); let fcu_requests = test_harness.l2_mock.fcu_requests.clone(); { @@ -803,7 +885,7 @@ mod tests { // test new_payload_v3 success let new_payload_response = test_harness - .client + .rpc_client .new_payload_v3( test_harness .l2_mock @@ -843,7 +925,7 @@ mod tests { // test get_payload_v3 success let get_payload_response = test_harness - .client + .rpc_client .get_payload_v3(PayloadId::new([0, 0, 0, 0, 0, 0, 0, 1])) .await; assert!(get_payload_response.is_ok()); @@ -862,6 +944,11 @@ mod tests { assert_eq!(*req, PayloadId::new([0, 0, 0, 0, 0, 0, 0, 1])); } + // Now that a block has been produced by the l2 but not the builder + // the health status should be Partial Content + let health = test_harness.get("healthz").await; + assert_eq!(health.status(), StatusCode::PARTIAL_CONTENT); + test_harness.cleanup().await; } @@ -873,7 +960,10 @@ mod tests { safe_block_hash: FixedBytes::random(), finalized_block_hash: FixedBytes::random(), }; - let fcu_response = test_harness.client.fork_choice_updated_v3(fcu, None).await; + let fcu_response = test_harness + .rpc_client + .fork_choice_updated_v3(fcu, None) + .await; assert!(fcu_response.is_ok()); sleep(std::time::Duration::from_millis(100)).await; @@ -889,7 +979,7 @@ mod tests { // test new_payload_v3 success let new_payload_response = test_harness - .client + .rpc_client .new_payload_v3( test_harness .l2_mock @@ -932,7 +1022,7 @@ mod tests { // test get_payload_v3 return l2 payload if builder payload is invalid let get_payload_response = test_harness - .client + .rpc_client .get_payload_v3(PayloadId::new([0, 0, 0, 0, 0, 0, 0, 0])) .await; assert!(get_payload_response.is_ok()); @@ -1006,7 +1096,10 @@ mod tests { safe_block_hash: FixedBytes::random(), finalized_block_hash: FixedBytes::random(), }; - let fcu_response = test_harness.client.fork_choice_updated_v3(fcu, None).await; + let fcu_response = test_harness + .rpc_client + .fork_choice_updated_v3(fcu, None) + .await; assert!(fcu_response.is_ok()); // wait for builder to observe the FCU call @@ -1019,7 +1112,7 @@ mod tests { } // Test getPayload call - let get_res = test_harness.client.get_payload_v3(same_id).await; + let get_res = test_harness.rpc_client.get_payload_v3(same_id).await; assert!(get_res.is_ok()); // wait for builder to observe the getPayload call diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a7496f31..e4489610 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -17,7 +17,7 @@ use op_alloy_rpc_types_engine::OpPayloadAttributes; use parking_lot::Mutex; use proxy::{ProxyHandler, start_proxy_server}; use rollup_boost::DebugClient; -use rollup_boost::{AuthClientLayer, AuthClientService}; +use rollup_boost::{AuthLayer, AuthService}; use rollup_boost::{EngineApiClient, OpExecutionPayloadEnvelope, Version}; use rollup_boost::{NewPayload, PayloadSource}; use services::op_reth::{AUTH_RPC_PORT, OpRethConfig, OpRethImage, OpRethMehods, P2P_PORT}; @@ -36,6 +36,7 @@ use testcontainers::runners::AsyncRunner; use testcontainers::{ContainerAsync, ImageExt}; use time::{OffsetDateTime, format_description}; use tokio::io::AsyncWriteExt as _; +use tower_http::sensitive_headers::SetSensitiveRequestHeaders; /// Default JWT token for testing purposes pub const JWT_SECRET: &str = "688f5d737bad920bdfb2fc2f488d6b6209eebda1dae949a8de91398d932c517a"; @@ -68,13 +69,13 @@ impl LogConsumer for LoggingConsumer { } pub struct EngineApi { - pub engine_api_client: HttpClient>, + pub engine_api_client: HttpClient>>, } // TODO: Use client/rpc.rs instead impl EngineApi { pub fn new(url: &str, secret: &str) -> eyre::Result { - let secret_layer = AuthClientLayer::new(JwtSecret::from_str(secret)?); + let secret_layer = AuthLayer::new(JwtSecret::from_str(secret)?); let middleware = tower::ServiceBuilder::default().layer(secret_layer); let client = jsonrpsee::http_client::HttpClientBuilder::default() .set_http_middleware(middleware)