Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 10 additions & 46 deletions crates/node/rpc/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,15 @@
use async_trait::async_trait;
use jsonrpsee::{
core::RpcResult,
types::{ErrorCode, ErrorObject},
};
use jsonrpsee::core::RpcResult;
use rollup_boost::Health;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

use crate::jsonrpsee::HealthzApiServer;

/// Key for the rollup boost health status.
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Execution Mode | Healthy | PartialContent | Service Unavailable |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Enabled | - Request-path: L2 succeeds | - Request-path: builder fails/stale | - Request-path: L2 fails |
/// | | (get/new payload) → 200 | while L2 succeeds → 206 | (error from L2) → 503 |
/// | | - Background: builder | - Background: builder fetch fails or | - Background: never sets 503 |
/// | | latest-unsafe is fresh → | latest-unsafe is stale → 206 | |
/// | | 200 | | |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | DryRun | - Request-path: L2 succeeds | - Never set in DryRun | - Request-path: L2 fails |
/// | | (always returns L2) → 200 | (degrade only in Enabled) | (error from L2) → 503 |
/// | | - Background: builder stale | | - Background: never sets 503 |
/// | | ignored (remains 200) | | |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Disabled | - Request-path: L2 succeeds | - Never set in Disabled | - Request-path: L2 fails |
/// | | (builder skipped) → 200 | (degrade only in Enabled) | (error from L2) → 503 |
/// | | - Background: N/A | | - Background: never sets 503 |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// The rollup boost health status.
///
/// This type is the same as [`Health`], but it implements `serde::Serialize`
/// and `serde::Deserialize`.
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub enum RollupBoostHealth {
/// Rollup boost is healthy.
Healthy,
Expand All @@ -55,8 +34,6 @@ impl From<Health> for RollupBoostHealth {
pub struct HealthzResponse {
/// The application version.
pub version: String,
/// The rollup boost health.
pub rollup_boost_health: RollupBoostHealth,
}

/// A query to get the health of the rollup boost server.
Expand All @@ -67,32 +44,19 @@ pub struct RollupBoostHealthQuery {
}

/// The healthz rpc server.
#[derive(Debug)]
pub struct HealthzRpc {
/// The rollup boost health.
pub rollup_boost_health: mpsc::Sender<RollupBoostHealthQuery>,
}
#[derive(Debug, Default)]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you import derive_more you can use derive_more::Constructor and remove the new method below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to use HealthzRpc::default() instead?

pub struct HealthzRpc;

impl HealthzRpc {
/// Constructs a new [`HealthzRpc`] given the rollup boost health sender.
pub const fn new(rollup_boost_health: mpsc::Sender<RollupBoostHealthQuery>) -> Self {
Self { rollup_boost_health }
/// Constructs a new [`HealthzRpc`].
pub const fn new() -> Self {
Self
}
}

#[async_trait]
impl HealthzApiServer for HealthzRpc {
async fn healthz(&self) -> RpcResult<HealthzResponse> {
let (tx, rx) = oneshot::channel();

self.rollup_boost_health
.send(RollupBoostHealthQuery { sender: tx })
.await
.map_err(|_| ErrorObject::from(ErrorCode::InternalError))?;

let rollup_boost_health =
rx.await.map_err(|_| ErrorObject::from(ErrorCode::InternalError))?;

Ok(HealthzResponse { version: env!("CARGO_PKG_VERSION").to_string(), rollup_boost_health })
Ok(HealthzResponse { version: env!("CARGO_PKG_VERSION").to_string() })
}
}
1 change: 1 addition & 0 deletions crates/node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jsonrpsee = { workspace = true, features = ["server"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tower.workspace = true
http-body-util.workspace = true
http.workspace = true

# metrics
metrics = { workspace = true, optional = true }
Expand Down
157 changes: 141 additions & 16 deletions crates/node/service/src/actors/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,141 @@

use crate::{NodeActor, actors::CancellableContext};
use async_trait::async_trait;
use kona_gossip::P2pRpcRequest;
use kona_rpc::{
AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzApiServer, HealthzRpc,
NetworkAdminQuery, OpP2PApiServer, RollupBoostAdminQuery, RollupBoostHealthQuery,
RollupNodeApiServer, SequencerAdminAPIClient, WsRPC, WsServer,
};
use std::time::Duration;

use jsonrpsee::{
RpcModule,
core::RegisterMethodError,
server::{Server, ServerHandle, middleware::http::ProxyGetRequestLayer},
server::{HttpBody, HttpRequest, HttpResponse, Server, ServerHandle, middleware::http::ProxyGetRequestLayer},
};
use kona_engine::EngineQueries;
use kona_rpc::{L1WatcherQueries, P2pRpc, RollupRpc, RpcBuilder};
use tokio::sync::mpsc;
use kona_gossip::P2pRpcRequest;
use kona_rpc::{
AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzApiServer, HealthzRpc,
L1WatcherQueries, NetworkAdminQuery, OpP2PApiServer, P2pRpc, RollupBoostAdminQuery,
RollupBoostHealth, RollupBoostHealthQuery, RollupNodeApiServer, RollupRpc, RpcBuilder,
SequencerAdminAPIClient, WsRPC, WsServer,
};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use tower::{Layer, Service};

/// The path for the rollup boost healthz endpoint.
const ROLLUP_BOOST_HEALTHZ_PATH: &str = "/kona-rollup-boost/healthz";

/// A tower layer that intercepts requests to `/kona-rollup-boost/healthz`
/// and returns the appropriate HTTP response based on health status.
#[derive(Debug, Clone)]
struct RollupBoostHealthLayer {
/// The rollup boost health query sender.
health_tx: mpsc::Sender<RollupBoostHealthQuery>,
}

impl RollupBoostHealthLayer {
/// Constructs a new [`RollupBoostHealthLayer`].
const fn new(health_tx: mpsc::Sender<RollupBoostHealthQuery>) -> Self {
Self { health_tx }
}
}

impl<S> Layer<S> for RollupBoostHealthLayer {
type Service = RollupBoostHealthMiddleware<S>;

fn layer(&self, inner: S) -> Self::Service {
RollupBoostHealthMiddleware { inner, health_tx: self.health_tx.clone() }
}
}

/// The middleware service that handles `/kona-rollup-boost/healthz` requests.
///
/// ## Health Status Determination
///
/// ```text
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Execution Mode | Healthy | PartialContent | Service Unavailable |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Enabled | - Request-path: L2 succeeds | - Request-path: builder fails/stale | - Request-path: L2 fails |
/// | | (get/new payload) → 200 | while L2 succeeds → 206 | (error from L2) → 503 |
/// | | - Background: builder | - Background: builder fetch fails or | - Background: never sets 503 |
/// | | latest-unsafe is fresh → | latest-unsafe is stale → 206 | |
/// | | 200 | | |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | DryRun | - Request-path: L2 succeeds | - Never set in DryRun | - Request-path: L2 fails |
/// | | (always returns L2) → 200 | (degrade only in Enabled) | (error from L2) → 503 |
/// | | - Background: builder stale | | - Background: never sets 503 |
/// | | ignored (remains 200) | | |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// | Disabled | - Request-path: L2 succeeds | - Never set in Disabled | - Request-path: L2 fails |
/// | | (builder skipped) → 200 | (degrade only in Enabled) | (error from L2) → 503 |
/// | | - Background: N/A | | - Background: never sets 503 |
/// +----------------+-------------------------------+--------------------------------------+-------------------------------+
/// ```
#[derive(Debug, Clone)]
struct RollupBoostHealthMiddleware<S> {
/// The inner service.
inner: S,
/// The rollup boost health query sender.
health_tx: mpsc::Sender<RollupBoostHealthQuery>,
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be a middleware but a simple RPC endpoint. Can you implement this method the same way as we implemented the top-level healthz endpoint? You don't need to redefine a new struct or anything, just add a new method to HealthzApiClient

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response above

#3130 (comment)


impl<S, ReqBody> Service<HttpRequest<ReqBody>> for RollupBoostHealthMiddleware<S>
where
S: Service<HttpRequest<ReqBody>, Response = HttpResponse> + Clone + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
S::Future: Send,
ReqBody: Send + 'static,
{
type Response = HttpResponse;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: HttpRequest<ReqBody>) -> Self::Future {
// Check if this is a GET request to the rollup boost healthz endpoint.
if req.method() == http::Method::GET && req.uri().path() == ROLLUP_BOOST_HEALTHZ_PATH {
let health_tx = self.health_tx.clone();

Box::pin(async move {
// Query the health status via the engine actor.
let (tx, rx) = oneshot::channel();
let health = match health_tx.send(RollupBoostHealthQuery { sender: tx }).await {
Ok(()) => rx.await.ok(),
Err(_) => None,
};

// Build the appropriate response.
let (status, body_str) = match health {
Some(RollupBoostHealth::Healthy) => (http::StatusCode::OK, "OK"),
Some(RollupBoostHealth::PartialContent) => {
(http::StatusCode::PARTIAL_CONTENT, "Partial Content")
}
Some(RollupBoostHealth::ServiceUnavailable) | None => {
(http::StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable")
}
};

let response = http::Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
.body(HttpBody::from(body_str))
.expect("Failed to build rollup boost healthz response");

Ok(response)
})
} else {
// Pass through to the inner service.
let fut = self.inner.call(req);
Box::pin(async move { fut.await })
}
}
}

/// An error returned by the [`RpcActor`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -90,8 +208,10 @@ impl<S: SequencerAdminAPIClient> CancellableContext for RpcContext<S> {
async fn launch(
config: &RpcBuilder,
module: RpcModule<()>,
health_tx: mpsc::Sender<RollupBoostHealthQuery>,
) -> Result<ServerHandle, std::io::Error> {
let middleware = tower::ServiceBuilder::new()
.layer(RollupBoostHealthLayer::new(health_tx))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that implemented as a layer and not just a regular RPC endpoint?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So jsonrpsee is a JSON-RPC server, which means everything comes back as HTTP 200 with a JSON body. even errors are just JSON-RPC error objects, not actual HTTP error codes. But the spec for this endpoint wants real HTTP status codes (200/206/503) with plain text bodies... can't do that with a normal RPC method. A tower layer lets us intercept the request before it hits jsonrpsee and just return a raw HTTP response.

.layer(
ProxyGetRequestLayer::new([("/healthz", "healthz")])
.expect("Critical: Failed to build GET method proxy"),
Expand Down Expand Up @@ -128,7 +248,7 @@ impl<S: SequencerAdminAPIClient + 'static> NodeActor for RpcActor<S> {
) -> Result<(), Self::Error> {
let mut modules = RpcModule::new(());

modules.merge(HealthzRpc::new(rollup_boost_health).into_rpc())?;
modules.merge(HealthzRpc::new().into_rpc())?;

// Build the p2p rpc module.
modules.merge(P2pRpc::new(p2p_network).into_rpc())?;
Expand All @@ -154,12 +274,12 @@ impl<S: SequencerAdminAPIClient + 'static> NodeActor for RpcActor<S> {

let restarts = self.config.restart_count();

let mut handle = launch(&self.config, modules.clone()).await?;
let mut handle = launch(&self.config, modules.clone(), rollup_boost_health.clone()).await?;

for _ in 0..=restarts {
tokio::select! {
_ = handle.clone().stopped() => {
match launch(&self.config, modules.clone()).await {
match launch(&self.config, modules.clone(), rollup_boost_health.clone()).await {
Ok(h) => handle = h,
Err(err) => {
error!(target: "rpc", ?err, "Failed to launch rpc server");
Expand Down Expand Up @@ -189,6 +309,11 @@ mod tests {

use super::*;

fn test_health_tx() -> mpsc::Sender<RollupBoostHealthQuery> {
let (tx, _rx) = mpsc::channel(1);
tx
}

#[tokio::test]
async fn test_launch_no_modules() {
let launcher = RpcBuilder {
Expand All @@ -199,7 +324,7 @@ mod tests {
ws_enabled: false,
dev_enabled: false,
};
let result = launch(&launcher, RpcModule::new(())).await;
let result = launch(&launcher, RpcModule::new(()), test_health_tx()).await;
assert!(result.is_ok());
}

Expand All @@ -219,7 +344,7 @@ mod tests {
modules.merge(RpcModule::new(())).expect("module merge");
modules.merge(RpcModule::new(())).expect("module merge");

let result = launch(&launcher, modules).await;
let result = launch(&launcher, modules, test_health_tx()).await;
assert!(result.is_ok());
}
}
Loading