Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/pbs/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub(crate) const STATUS_ENDPOINT_TAG: &str = "status";
pub(crate) const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator";
pub(crate) const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block";
pub(crate) const GET_HEADER_ENDPOINT_TAG: &str = "get_header";
29 changes: 19 additions & 10 deletions crates/pbs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use alloy::primitives::{B256, U256};
use alloy::rpc::types::beacon::BlsPublicKey;
use alloy::{
primitives::{B256, U256},
rpc::types::beacon::BlsPublicKey,
};
use axum::{http::StatusCode, response::IntoResponse};
use thiserror::Error;

Expand All @@ -10,19 +12,26 @@ pub enum PbsClientError {
NoPayload,
}

impl IntoResponse for PbsClientError {
fn into_response(self) -> axum::response::Response {
impl PbsClientError {
pub fn status_code(&self) -> StatusCode {
match self {
PbsClientError::NoResponse => {
(StatusCode::SERVICE_UNAVAILABLE, "no response from relays").into_response()
}
PbsClientError::NoPayload => {
(StatusCode::BAD_GATEWAY, "no payload from relays").into_response()
}
PbsClientError::NoResponse => StatusCode::SERVICE_UNAVAILABLE,
PbsClientError::NoPayload => StatusCode::BAD_GATEWAY,
}
}
}

impl IntoResponse for PbsClientError {
fn into_response(self) -> axum::response::Response {
let msg = match self {
PbsClientError::NoResponse => "no response from relays",
PbsClientError::NoPayload => "no payload from relays",
};

(self.status_code(), msg).into_response()
}
}

#[derive(Debug, Error)]
pub enum PbsError {
#[error("axum error: {0}")]
Expand Down
1 change: 1 addition & 0 deletions crates/pbs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// implements https://github.com/ethereum/builder-specs and multiplexes to multiple builderAPI compatible clients (ie MEV Boost relays)

mod boost;
mod constants;
mod error;
mod metrics;
mod mev_boost;
Expand Down
60 changes: 35 additions & 25 deletions crates/pbs/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
//! Metrics for PBS module
//! We collect two types of metrics within the PBS module:
//! - what PBS receives from relays
//! - what PBS returns to the beacon node

use lazy_static::lazy_static;
use prometheus::{histogram_opts, opts, HistogramVec, IntCounterVec, Registry};
use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramVec,
IntCounterVec, Registry,
};

lazy_static! {
pub static ref PBS_METRICS_REGISTRY: prometheus::Registry =
pub static ref PBS_METRICS_REGISTRY: Registry =
Registry::new_custom(Some("cb_pbs".to_string()), None).unwrap();
pub static ref REQUESTS_RECEIVED: IntCounterVec =
IntCounterVec::new(opts!("requests_received", "Number of requests received"), &[
"endpoint",
])
.unwrap();
pub static ref RELAY_RESPONSES: IntCounterVec =
IntCounterVec::new(opts!("relay_response", "Number of requests received"), &[
"code", "endpoint", "relay_id"
])
.unwrap();
pub static ref RELAY_RESPONSE_TIME: HistogramVec =
HistogramVec::new(histogram_opts!("relay_response_time", "Relay response times"), &[
"endpoint", "relay_id"
])
.unwrap();
}

// TODO: this can be done with the macros, need to fix the types
pub fn register_default_metrics() {
PBS_METRICS_REGISTRY.register(Box::new(REQUESTS_RECEIVED.clone())).expect("failed to register");
// FROM RELAYS
/// Status code received by relay by endpoint
pub static ref RELAY_STATUS_CODE: IntCounterVec = register_int_counter_vec_with_registry!(
"relay_status_code",
"HTTP status code received by relay",
&["http_status_code", "endpoint", "relay_id"],
PBS_METRICS_REGISTRY
)
.unwrap();

PBS_METRICS_REGISTRY.register(Box::new(RELAY_RESPONSES.clone())).expect("failed to register");
/// Latency by relay by endpoint
pub static ref RELAY_LATENCY: HistogramVec = register_histogram_vec_with_registry!(
"relay_latency",
"HTTP latency by relay",
&["endpoint", "relay_id"],
PBS_METRICS_REGISTRY
)
.unwrap();

PBS_METRICS_REGISTRY
.register(Box::new(RELAY_RESPONSE_TIME.clone()))
.expect("failed to register");
// TO BEACON NODE
/// Status code returned to beacon node by endpoint
pub static ref BEACON_NODE_STATUS: IntCounterVec = register_int_counter_vec_with_registry!(
"beacon_node_status_code",
"HTTP status code returned to beacon node",
&["http_status_code", "endpoint"],
PBS_METRICS_REGISTRY
).unwrap();
}
22 changes: 15 additions & 7 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{sync::Arc, time::Duration};

use alloy::primitives::{B256, U256};
use alloy::rpc::types::beacon::BlsPublicKey;
use alloy::{
primitives::{B256, U256},
rpc::types::beacon::BlsPublicKey,
};
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
config::PbsConfig,
Expand All @@ -15,8 +17,9 @@ use reqwest::{header::USER_AGENT, StatusCode};
use tracing::{debug, error};

use crate::{
constants::GET_HEADER_ENDPOINT_TAG,
error::{PbsError, ValidationError},
metrics::{RELAY_RESPONSES, RELAY_RESPONSE_TIME},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
types::{SignedExecutionPayloadHeader, EMPTY_TX_ROOT_HASH},
GetHeaderParams, GetHeaderReponse,
Expand Down Expand Up @@ -84,7 +87,8 @@ async fn send_get_header(
) -> Result<Option<GetHeaderReponse>, PbsError> {
let url = relay.get_header_url(slot, parent_hash, validator_pubkey);

let timer = RELAY_RESPONSE_TIME.with_label_values(&["get_header", &relay.id]).start_timer();
let timer =
RELAY_LATENCY.with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]).start_timer();
let res = client
.get(url)
.timeout(Duration::from_millis(config.timeout_get_header_ms))
Expand All @@ -94,7 +98,9 @@ async fn send_get_header(
timer.observe_duration();

let status = res.status();
RELAY_RESPONSES.with_label_values(&[&status.to_string(), "get_header", &relay.id]).inc();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
Expand Down Expand Up @@ -184,8 +190,10 @@ fn validate_header(

#[cfg(test)]
mod tests {
use alloy::primitives::{B256, U256};
use alloy::rpc::types::beacon::BlsPublicKey;
use alloy::{
primitives::{B256, U256},
rpc::types::beacon::BlsPublicKey,
};
use blst::min_pk;
use cb_common::{pbs::RelayEntry, signature::sign_builder_message, types::Chain};

Expand Down
12 changes: 8 additions & 4 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use reqwest::header::USER_AGENT;
use tracing::error;

use crate::{
constants::REGISTER_VALIDATOR_ENDPOINT_TAG,
error::PbsError,
metrics::{RELAY_RESPONSES, RELAY_RESPONSE_TIME},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
};

Expand Down Expand Up @@ -62,8 +63,9 @@ async fn send_register_validator(
) -> Result<(), PbsError> {
let url = relay.register_validator_url();

let timer =
RELAY_RESPONSE_TIME.with_label_values(&["register_validator", &relay.id]).start_timer();
let timer = RELAY_LATENCY
.with_label_values(&[REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.start_timer();
let res = client
.post(url)
.timeout(Duration::from_millis(timeout_ms))
Expand All @@ -76,7 +78,9 @@ async fn send_register_validator(
// TODO: send to relay monitor

let status = res.status();
RELAY_RESPONSES.with_label_values(&[&status.to_string(), "get_header", &relay.id]).inc();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
Expand Down
10 changes: 6 additions & 4 deletions crates/pbs/src/mev_boost/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use futures::future::select_ok;
use reqwest::header::USER_AGENT;

use crate::{
constants::STATUS_ENDPOINT_TAG,
error::PbsError,
metrics::{RELAY_RESPONSES, RELAY_RESPONSE_TIME},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
};

/// Implements https://ethereum.github.io/builder-specs/#/Builder/status
/// Broadcasts a status check to all relays and returns 200 if at least one relay returns 200
/// Broadcasts a status check to all relays and returns 200 if at least one
/// relay returns 200
pub async fn get_status<S: BuilderApiState>(
req_headers: HeaderMap,
state: PbsState<S>,
Expand Down Expand Up @@ -54,12 +56,12 @@ async fn send_relay_check(
) -> Result<(), PbsError> {
let url = relay.get_status_url();

let timer = RELAY_RESPONSE_TIME.with_label_values(&["get_status", &relay.id]).start_timer();
let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer();
let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?;
timer.observe_duration();

let status = res.status();
RELAY_RESPONSES.with_label_values(&[&status.to_string(), "get_status", &relay.id]).inc();
RELAY_STATUS_CODE.with_label_values(&[status.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
Expand Down
14 changes: 10 additions & 4 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use reqwest::header::USER_AGENT;
use tracing::warn;

use crate::{
constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG,
error::{PbsError, ValidationError},
metrics::{RELAY_RESPONSES, RELAY_RESPONSE_TIME},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
types::{SignedBlindedBeaconBlock, SubmitBlindedBlockResponse},
};
Expand Down Expand Up @@ -63,7 +64,8 @@ pub async fn submit_block<S: BuilderApiState>(
}
}

// submits blinded signed block and expects the execution payload + blobs bundle back
// submits blinded signed block and expects the execution payload + blobs bundle
// back
async fn send_submit_block(
headers: HeaderMap,
relay: RelayEntry,
Expand All @@ -73,7 +75,9 @@ async fn send_submit_block(
) -> Result<SubmitBlindedBlockResponse, PbsError> {
let url = relay.submit_block_url();

let timer = RELAY_RESPONSE_TIME.with_label_values(&["submit_block", &relay.id]).start_timer();
let timer = RELAY_LATENCY
.with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.start_timer();
let res = client
.post(url)
.timeout(Duration::from_millis(config.timeout_get_payload_ms))
Expand All @@ -84,7 +88,9 @@ async fn send_submit_block(
timer.observe_duration();

let status = res.status();
RELAY_RESPONSES.with_label_values(&[&status.to_string(), "submit_block", &relay.id]).inc();
RELAY_STATUS_CODE
.with_label_values(&[status.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if !status.is_success() {
Expand Down
12 changes: 9 additions & 3 deletions crates/pbs/src/routes/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use uuid::Uuid;

use crate::{
boost::BuilderApi,
constants::GET_HEADER_ENDPOINT_TAG,
error::PbsClientError,
metrics::REQUESTS_RECEIVED,
metrics::BEACON_NODE_STATUS,
state::{BuilderApiState, PbsState},
BuilderEvent, GetHeaderParams,
};
Expand All @@ -21,7 +22,6 @@ pub async fn handle_get_header<S: BuilderApiState, T: BuilderApi<S>>(
req_headers: HeaderMap,
Path(params): Path<GetHeaderParams>,
) -> Result<impl IntoResponse, PbsClientError> {
REQUESTS_RECEIVED.with_label_values(&["get_header"]).inc();
state.publish_event(BuilderEvent::GetHeaderRequest(params));

let req_id = Uuid::new_v4();
Expand All @@ -37,16 +37,22 @@ pub async fn handle_get_header<S: BuilderApiState, T: BuilderApi<S>>(

if let Some(max_bid) = res {
info!(event ="get_header", %req_id, block_hash =% max_bid.data.message.header.block_hash, "header available for slot");
BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc();
Ok((StatusCode::OK, axum::Json(max_bid)).into_response())
} else {
// spec: return 204 if request is valid but no bid available
info!(event = "get_header", %req_id, "no header available for slot");
BEACON_NODE_STATUS.with_label_values(&["204", GET_HEADER_ENDPOINT_TAG]).inc();
Ok(StatusCode::NO_CONTENT.into_response())
}
}
Err(err) => {
error!(event = "get_header", %req_id, ?err, "failed relay get_header");
Err(PbsClientError::NoPayload)
let err = PbsClientError::NoPayload;
BEACON_NODE_STATUS
.with_label_values(&[err.status_code().as_str(), GET_HEADER_ENDPOINT_TAG])
.inc();
Err(err)
}
}
}
11 changes: 8 additions & 3 deletions crates/pbs/src/routes/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use uuid::Uuid;

use crate::{
boost::BuilderApi,
constants::REGISTER_VALIDATOR_ENDPOINT_TAG,
error::PbsClientError,
metrics::REQUESTS_RECEIVED,
metrics::BEACON_NODE_STATUS,
state::{BuilderApiState, PbsState},
BuilderEvent,
};
Expand All @@ -18,7 +19,6 @@ pub async fn handle_register_validator<S: BuilderApiState, T: BuilderApi<S>>(
req_headers: HeaderMap,
Json(registrations): Json<Vec<ValidatorRegistration>>,
) -> Result<impl IntoResponse, PbsClientError> {
REQUESTS_RECEIVED.with_label_values(&["register_validator"]).inc();
state.publish_event(BuilderEvent::RegisterValidatorRequest(registrations.clone()));

let req_id = Uuid::new_v4();
Expand All @@ -30,9 +30,14 @@ pub async fn handle_register_validator<S: BuilderApiState, T: BuilderApi<S>>(
state.publish_event(BuilderEvent::RegisterValidatorResponse);

error!(method = "register_validator", %req_id, ?err, "all relays failed register_validator");
Err(PbsClientError::NoResponse)
let err = PbsClientError::NoResponse;
BEACON_NODE_STATUS
.with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG])
.inc();
Err(err)
} else {
info!(event = "register_validator", %req_id, "register validator successful");
BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc();
Ok(StatusCode::OK)
}
}
Loading