diff --git a/crates/node/rpc/src/health.rs b/crates/node/rpc/src/health.rs index e99a8328be..bb32894d63 100644 --- a/crates/node/rpc/src/health.rs +++ b/crates/node/rpc/src/health.rs @@ -1,36 +1,15 @@ use async_trait::async_trait; -use jsonrpsee::{ - core::RpcResult, - types::{ErrorCode, ErrorObject}, -}; +use jsonrpsee::core::RpcResult; use rollup_boost::Health; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use crate::jsonrpsee::HealthzApiServer; -/// Key for the rollup boost health status. -/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ -/// | Execution Mode | Healthy | PartialContent | Service Unavailable | -/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ -/// | Enabled | - Request-path: L2 succeeds | - Request-path: builder fails/stale | - Request-path: L2 fails | -/// | | (get/new payload) → 200 | while L2 succeeds → 206 | (error from L2) → 503 | -/// | | - Background: builder | - Background: builder fetch fails or | - Background: never sets 503 | -/// | | latest-unsafe is fresh → | latest-unsafe is stale → 206 | | -/// | | 200 | | | -/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ -/// | DryRun | - Request-path: L2 succeeds | - Never set in DryRun | - Request-path: L2 fails | -/// | | (always returns L2) → 200 | (degrade only in Enabled) | (error from L2) → 503 | -/// | | - Background: builder stale | | - Background: never sets 503 | -/// | | ignored (remains 200) | | | -/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ -/// | Disabled | - Request-path: L2 succeeds | - Never set in Disabled | - Request-path: L2 fails | -/// | | (builder skipped) → 200 | (degrade only in Enabled) | (error from L2) → 503 | -/// | | - Background: N/A | | - Background: never sets 503 | -/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// The rollup boost health status. /// /// This type is the same as [`Health`], but it implements `serde::Serialize` /// and `serde::Deserialize`. -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, serde::Serialize)] pub enum RollupBoostHealth { /// Rollup boost is healthy. Healthy, @@ -55,8 +34,6 @@ impl From for RollupBoostHealth { pub struct HealthzResponse { /// The application version. pub version: String, - /// The rollup boost health. - pub rollup_boost_health: RollupBoostHealth, } /// A query to get the health of the rollup boost server. @@ -67,32 +44,19 @@ pub struct RollupBoostHealthQuery { } /// The healthz rpc server. -#[derive(Debug)] -pub struct HealthzRpc { - /// The rollup boost health. - pub rollup_boost_health: mpsc::Sender, -} +#[derive(Debug, Default)] +pub struct HealthzRpc; impl HealthzRpc { - /// Constructs a new [`HealthzRpc`] given the rollup boost health sender. - pub const fn new(rollup_boost_health: mpsc::Sender) -> Self { - Self { rollup_boost_health } + /// Constructs a new [`HealthzRpc`]. + pub const fn new() -> Self { + Self } } #[async_trait] impl HealthzApiServer for HealthzRpc { async fn healthz(&self) -> RpcResult { - let (tx, rx) = oneshot::channel(); - - self.rollup_boost_health - .send(RollupBoostHealthQuery { sender: tx }) - .await - .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; - - let rollup_boost_health = - rx.await.map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; - - Ok(HealthzResponse { version: env!("CARGO_PKG_VERSION").to_string(), rollup_boost_health }) + Ok(HealthzResponse { version: env!("CARGO_PKG_VERSION").to_string() }) } } diff --git a/crates/node/service/Cargo.toml b/crates/node/service/Cargo.toml index 566fcbc24e..f4042fb797 100644 --- a/crates/node/service/Cargo.toml +++ b/crates/node/service/Cargo.toml @@ -66,6 +66,7 @@ jsonrpsee = { workspace = true, features = ["server"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tower.workspace = true http-body-util.workspace = true +http.workspace = true # metrics metrics = { workspace = true, optional = true } diff --git a/crates/node/service/src/actors/rpc.rs b/crates/node/service/src/actors/rpc.rs index b6f33699e2..1e2a7fabb4 100644 --- a/crates/node/service/src/actors/rpc.rs +++ b/crates/node/service/src/actors/rpc.rs @@ -2,23 +2,141 @@ use crate::{NodeActor, actors::CancellableContext}; use async_trait::async_trait; -use kona_gossip::P2pRpcRequest; -use kona_rpc::{ - AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzApiServer, HealthzRpc, - NetworkAdminQuery, OpP2PApiServer, RollupBoostAdminQuery, RollupBoostHealthQuery, - RollupNodeApiServer, SequencerAdminAPIClient, WsRPC, WsServer, -}; -use std::time::Duration; - use jsonrpsee::{ RpcModule, core::RegisterMethodError, - server::{Server, ServerHandle, middleware::http::ProxyGetRequestLayer}, + server::{HttpBody, HttpRequest, HttpResponse, Server, ServerHandle, middleware::http::ProxyGetRequestLayer}, }; use kona_engine::EngineQueries; -use kona_rpc::{L1WatcherQueries, P2pRpc, RollupRpc, RpcBuilder}; -use tokio::sync::mpsc; +use kona_gossip::P2pRpcRequest; +use kona_rpc::{ + AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzApiServer, HealthzRpc, + L1WatcherQueries, NetworkAdminQuery, OpP2PApiServer, P2pRpc, RollupBoostAdminQuery, + RollupBoostHealth, RollupBoostHealthQuery, RollupNodeApiServer, RollupRpc, RpcBuilder, + SequencerAdminAPIClient, WsRPC, WsServer, +}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use tower::{Layer, Service}; + +/// The path for the rollup boost healthz endpoint. +const ROLLUP_BOOST_HEALTHZ_PATH: &str = "/kona-rollup-boost/healthz"; + +/// A tower layer that intercepts requests to `/kona-rollup-boost/healthz` +/// and returns the appropriate HTTP response based on health status. +#[derive(Debug, Clone)] +struct RollupBoostHealthLayer { + /// The rollup boost health query sender. + health_tx: mpsc::Sender, +} + +impl RollupBoostHealthLayer { + /// Constructs a new [`RollupBoostHealthLayer`]. + const fn new(health_tx: mpsc::Sender) -> Self { + Self { health_tx } + } +} + +impl Layer for RollupBoostHealthLayer { + type Service = RollupBoostHealthMiddleware; + + fn layer(&self, inner: S) -> Self::Service { + RollupBoostHealthMiddleware { inner, health_tx: self.health_tx.clone() } + } +} + +/// The middleware service that handles `/kona-rollup-boost/healthz` requests. +/// +/// ## Health Status Determination +/// +/// ```text +/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// | Execution Mode | Healthy | PartialContent | Service Unavailable | +/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// | Enabled | - Request-path: L2 succeeds | - Request-path: builder fails/stale | - Request-path: L2 fails | +/// | | (get/new payload) → 200 | while L2 succeeds → 206 | (error from L2) → 503 | +/// | | - Background: builder | - Background: builder fetch fails or | - Background: never sets 503 | +/// | | latest-unsafe is fresh → | latest-unsafe is stale → 206 | | +/// | | 200 | | | +/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// | DryRun | - Request-path: L2 succeeds | - Never set in DryRun | - Request-path: L2 fails | +/// | | (always returns L2) → 200 | (degrade only in Enabled) | (error from L2) → 503 | +/// | | - Background: builder stale | | - Background: never sets 503 | +/// | | ignored (remains 200) | | | +/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// | Disabled | - Request-path: L2 succeeds | - Never set in Disabled | - Request-path: L2 fails | +/// | | (builder skipped) → 200 | (degrade only in Enabled) | (error from L2) → 503 | +/// | | - Background: N/A | | - Background: never sets 503 | +/// +----------------+-------------------------------+--------------------------------------+-------------------------------+ +/// ``` +#[derive(Debug, Clone)] +struct RollupBoostHealthMiddleware { + /// The inner service. + inner: S, + /// The rollup boost health query sender. + health_tx: mpsc::Sender, +} + +impl Service> for RollupBoostHealthMiddleware +where + S: Service, Response = HttpResponse> + Clone + Send + 'static, + S::Error: Into>, + S::Future: Send, + ReqBody: Send + 'static, +{ + type Response = HttpResponse; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + // Check if this is a GET request to the rollup boost healthz endpoint. + if req.method() == http::Method::GET && req.uri().path() == ROLLUP_BOOST_HEALTHZ_PATH { + let health_tx = self.health_tx.clone(); + + Box::pin(async move { + // Query the health status via the engine actor. + let (tx, rx) = oneshot::channel(); + let health = match health_tx.send(RollupBoostHealthQuery { sender: tx }).await { + Ok(()) => rx.await.ok(), + Err(_) => None, + }; + + // Build the appropriate response. + let (status, body_str) = match health { + Some(RollupBoostHealth::Healthy) => (http::StatusCode::OK, "OK"), + Some(RollupBoostHealth::PartialContent) => { + (http::StatusCode::PARTIAL_CONTENT, "Partial Content") + } + Some(RollupBoostHealth::ServiceUnavailable) | None => { + (http::StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable") + } + }; + + let response = http::Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(HttpBody::from(body_str)) + .expect("Failed to build rollup boost healthz response"); + + Ok(response) + }) + } else { + // Pass through to the inner service. + let fut = self.inner.call(req); + Box::pin(async move { fut.await }) + } + } +} /// An error returned by the [`RpcActor`]. #[derive(Debug, thiserror::Error)] @@ -90,8 +208,10 @@ impl CancellableContext for RpcContext { async fn launch( config: &RpcBuilder, module: RpcModule<()>, + health_tx: mpsc::Sender, ) -> Result { let middleware = tower::ServiceBuilder::new() + .layer(RollupBoostHealthLayer::new(health_tx)) .layer( ProxyGetRequestLayer::new([("/healthz", "healthz")]) .expect("Critical: Failed to build GET method proxy"), @@ -128,7 +248,7 @@ impl NodeActor for RpcActor { ) -> Result<(), Self::Error> { let mut modules = RpcModule::new(()); - modules.merge(HealthzRpc::new(rollup_boost_health).into_rpc())?; + modules.merge(HealthzRpc::new().into_rpc())?; // Build the p2p rpc module. modules.merge(P2pRpc::new(p2p_network).into_rpc())?; @@ -154,12 +274,12 @@ impl NodeActor for RpcActor { let restarts = self.config.restart_count(); - let mut handle = launch(&self.config, modules.clone()).await?; + let mut handle = launch(&self.config, modules.clone(), rollup_boost_health.clone()).await?; for _ in 0..=restarts { tokio::select! { _ = handle.clone().stopped() => { - match launch(&self.config, modules.clone()).await { + match launch(&self.config, modules.clone(), rollup_boost_health.clone()).await { Ok(h) => handle = h, Err(err) => { error!(target: "rpc", ?err, "Failed to launch rpc server"); @@ -189,6 +309,11 @@ mod tests { use super::*; + fn test_health_tx() -> mpsc::Sender { + let (tx, _rx) = mpsc::channel(1); + tx + } + #[tokio::test] async fn test_launch_no_modules() { let launcher = RpcBuilder { @@ -199,7 +324,7 @@ mod tests { ws_enabled: false, dev_enabled: false, }; - let result = launch(&launcher, RpcModule::new(())).await; + let result = launch(&launcher, RpcModule::new(()), test_health_tx()).await; assert!(result.is_ok()); } @@ -219,7 +344,7 @@ mod tests { modules.merge(RpcModule::new(())).expect("module merge"); modules.merge(RpcModule::new(())).expect("module merge"); - let result = launch(&launcher, modules).await; + let result = launch(&launcher, modules, test_health_tx()).await; assert!(result.is_ok()); } }