From 7cbf1a2989120e36952985ee5564a3562755d6b1 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 26 Jul 2024 14:57:45 +0100 Subject: [PATCH 1/5] small fixes --- crates/pbs/src/error.rs | 2 +- crates/pbs/src/mev_boost/get_header.rs | 11 ++++++++--- crates/pbs/src/mev_boost/submit_block.rs | 6 +++--- crates/pbs/src/routes/get_header.rs | 1 + 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 9b2c2a07..3b79a922 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -43,7 +43,7 @@ pub enum PbsError { #[error("serde decode error: {0}")] SerdeDecodeError(#[from] serde_json::Error), - #[error("relay response error. Code: {code}, text: {error_msg}")] + #[error("relay response error. Code: {code}, err: {error_msg}")] RelayResponse { error_msg: String, code: u16 }, #[error("failed validating relay response: {0}")] diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 49b86bca..94de345b 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -33,7 +33,7 @@ pub async fn get_header( state: PbsState, ) -> eyre::Result> { let GetHeaderParams { slot, parent_hash, pubkey: validator_pubkey } = params; - let slot_uuid = state.get_or_update_slot_uuid(slot); + let (_, slot_uuid) = state.get_slot_and_uuid(); // prepare headers let mut send_headers = HeaderMap::new(); @@ -67,7 +67,13 @@ pub async fn get_header( match res { Ok(Some(res)) => relay_bids.push(res), Ok(_) => {} - Err(err) => error!(?err, relay_id), + Err(err) => match err { + PbsError::Reqwest(req_err) if req_err.is_timeout() => { + error!(err = "Timed Out", relay_id) + } + + _ => error!(?err, relay_id), + }, } } @@ -121,7 +127,6 @@ async fn send_get_header( let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?; debug!( - ?code, latency_ms, block_hash = %get_header_response.block_hash(), value_eth = format_ether(get_header_response.value()), diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index eb49ece5..2fb05609 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -8,7 +8,7 @@ use cb_common::{ }; use futures::future::select_ok; use reqwest::header::USER_AGENT; -use tracing::{debug, error}; +use tracing::{debug, warn}; use crate::{ constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, @@ -90,14 +90,14 @@ async fn send_submit_block( code: code.as_u16(), }; - error!(?err, "failed submit block"); + // we request payload to all relays, but some may have not received it + warn!(?err, "failed to get payload (this might be ok if other relays have it)"); return Err(err) }; let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?; debug!( - ?code, latency_ms, block_hash = %block_response.block_hash(), "received unblinded block" diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 082e43a5..ee707099 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -25,6 +25,7 @@ pub async fn handle_get_header>( Path(params): Path, ) -> Result { state.publish_event(BuilderEvent::GetHeaderRequest(params)); + state.get_or_update_slot_uuid(params.slot); let now = utcnow_ms(); let slot_start_ms = timestamp_of_slot_start_millis(params.slot, state.config.chain); From 00d4620b55b74df915f1d7d3ca02844caefa3bab Mon Sep 17 00:00:00 2001 From: ltitanb Date: Mon, 29 Jul 2024 19:10:46 +0100 Subject: [PATCH 2/5] add timing games --- .dockerignore | 1 + config.example.toml | 13 +- crates/common/src/commit/client.rs | 2 +- crates/common/src/config.rs | 57 ++-- crates/common/src/pbs/config.rs | 58 ++++ crates/common/src/pbs/constants.rs | 10 + crates/common/src/pbs/mod.rs | 4 +- crates/common/src/pbs/types.rs | 110 ++++++-- crates/common/src/types.rs | 2 +- crates/common/src/utils.rs | 13 +- crates/pbs/src/constants.rs | 4 + crates/pbs/src/mev_boost/get_header.rs | 265 +++++++++++++----- .../pbs/src/mev_boost/register_validator.rs | 49 ++-- crates/pbs/src/mev_boost/status.rs | 46 +-- crates/pbs/src/mev_boost/submit_block.rs | 57 ++-- crates/pbs/src/routes/get_header.rs | 17 +- crates/pbs/src/routes/submit_block.rs | 4 +- crates/pbs/src/state.rs | 48 +--- tests/src/mock_validator.rs | 38 ++- tests/src/utils.rs | 9 + tests/tests/config.rs | 10 + tests/tests/pbs_integration.rs | 53 ++-- 22 files changed, 570 insertions(+), 300 deletions(-) create mode 100644 .dockerignore create mode 100644 crates/common/src/pbs/config.rs create mode 100644 tests/tests/config.rs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..1de56593 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +target \ No newline at end of file diff --git a/config.example.toml b/config.example.toml index ae0b0cdf..79a1b57d 100644 --- a/config.example.toml +++ b/config.example.toml @@ -2,7 +2,6 @@ chain = "Holesky" [pbs] port = 18550 -relays = [] relay_check = true timeout_get_header_ms = 950 timeout_get_payload_ms = 4000 @@ -10,8 +9,16 @@ timeout_register_validator_ms = 3000 skip_sigverify = true min_bid_eth = 0.0 -[headers] -X-MyCustomHeader = "MyCustomValue" +late_in_slot_time_ms = 2000 +skip_header_late_in_slot = true + +[[relays]] +id = "example-relay" +url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz" +headers = { X-MyCustomHeader = "MyCustomValue" } +enable_timing_games = false +wait_first_header_ms = 250 +frequency_get_header_ms = 100 [signer] [signer.loader] diff --git a/crates/common/src/commit/client.rs b/crates/common/src/commit/client.rs index 9b6f7c27..b452fa4e 100644 --- a/crates/common/src/commit/client.rs +++ b/crates/common/src/commit/client.rs @@ -21,7 +21,7 @@ pub struct GetPubkeysResponse { #[derive(Debug, Clone)] pub struct SignerClient { /// Url endpoint of the Signer Module - url: Arc, + url: Arc, client: reqwest::Client, } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index e037dd87..dd8aad42 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,11 +1,15 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::primitives::U256; use eyre::{eyre, ContextCompat}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use super::utils::as_eth_str; -use crate::{commit::client::SignerClient, loader::SignerLoader, pbs::RelayEntry, types::Chain}; +use crate::{ + commit::client::SignerClient, + loader::SignerLoader, + pbs::{PbsConfig, RelayClient, RelayConfig}, + types::Chain, + utils::default_bool, +}; pub const MODULE_ID_ENV: &str = "CB_MODULE_ID"; pub const MODULE_JWT_ENV: &str = "CB_SIGNER_JWT"; @@ -32,6 +36,7 @@ pub const SIGNER_IMAGE: &str = "commitboost_signer"; pub struct CommitBoostConfig { // TODO: generalize this with a spec file pub chain: Chain, + pub relays: Vec, pub pbs: StaticPbsConfig, pub modules: Option>, pub signer: Option, @@ -136,55 +141,23 @@ pub struct StaticPbsConfig { pub with_signer: bool, } -#[derive(Debug, Clone, Default, Deserialize, Serialize)] -pub struct PbsConfig { - /// Port to receive BuilderAPI calls from CL - pub port: u16, - /// Which relay to register/subscribe to - pub relays: Vec, - /// Whether to forward getStatus to relays or skip it - pub relay_check: bool, - #[serde(default = "default_u64::<950>")] - pub timeout_get_header_ms: u64, - #[serde(default = "default_u64::<4000>")] - pub timeout_get_payload_ms: u64, - #[serde(default = "default_u64::<3000>")] - pub timeout_register_validator_ms: u64, - /// Whether to skip the relay signature verification - #[serde(default = "default_bool::")] - pub skip_sigverify: bool, - /// Minimum bid that will be accepted from get_header - #[serde(rename = "min_bid_eth", with = "as_eth_str", default = "default_u256")] - pub min_bid_wei: U256, - /// Custom headers to send to relays - pub headers: Option>, -} - /// Runtime config for the pbs module with support for custom extra config +/// This will be shared across threads, so the `extra` should be thread safe, +/// e.g. wrapped in an Arc #[derive(Debug, Clone)] pub struct PbsModuleConfig { /// Chain spec pub chain: Chain, /// Pbs default config pub pbs_config: Arc, + /// List of relays + pub relays: Vec, /// Signer client to call Signer API pub signer_client: Option, /// Opaque module config pub extra: T, } -const fn default_u64() -> u64 { - U -} - -const fn default_bool() -> bool { - U -} - -const fn default_u256() -> U256 { - U256::ZERO -} - fn default_pbs() -> String { PBS_DEFAULT_IMAGE.to_string() } @@ -192,9 +165,12 @@ fn default_pbs() -> String { /// Loads the default pbs config, i.e. with no signer client or custom data pub fn load_pbs_config() -> eyre::Result> { let config = CommitBoostConfig::from_env_path(); + let relay_clients = config.relays.into_iter().map(RelayClient::new).collect(); + Ok(PbsModuleConfig { chain: config.chain, pbs_config: Arc::new(config.pbs.pbs_config), + relays: relay_clients, signer_client: None, extra: (), }) @@ -213,11 +189,13 @@ pub fn load_pbs_custom_config() -> eyre::Result { chain: Chain, + relays: Vec, pbs: CustomPbsConfig, } // load module config including the extra data (if any) let cb_config: StubConfig = load_file_from_env(CB_CONFIG_ENV); + let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect(); let signer_client = if cb_config.pbs.static_config.with_signer { // if custom pbs requires a signer client, load jwt @@ -231,6 +209,7 @@ pub fn load_pbs_custom_config() -> eyre::Result, + /// Relay in the form of pubkey@url + #[serde(rename = "url")] + pub entry: RelayEntry, + /// Optional headers to send with each request + pub headers: Option>, + /// Whether to enable timing games + pub enable_timing_games: bool, + /// Delay in ms to wait before sending the first get_header + pub wait_first_header_ms: Option, + /// Frequency in ms to send get_header requests + pub frequency_get_header_ms: Option, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +pub struct PbsConfig { + /// Port to receive BuilderAPI calls from beacon node + pub port: u16, + /// Whether to forward `get_status`` to relays or skip it + pub relay_check: bool, + /// Timeout for get_header request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::GET_HEADER_MS }>")] + pub timeout_get_header_ms: u64, + /// Timeout for get_payload request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::GET_PAYLOAD_MS }>")] + pub timeout_get_payload_ms: u64, + /// Timeout for register_validator request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::REGISTER_VALIDATOR_MS }>")] + pub timeout_register_validator_ms: u64, + /// Whether to skip the relay signature verification + #[serde(default = "default_bool::")] + pub skip_sigverify: bool, + /// Minimum bid that will be accepted from get_header + #[serde(rename = "min_bid_eth", with = "as_eth_str", default = "default_u256")] + pub min_bid_wei: U256, + /// How late in the slot we consider to be "late" + #[serde(default = "default_u64::")] + pub late_in_slot_time_ms: u64, + /// If it's too late in the slot, skip get header and force local build + #[serde(default = "default_bool::")] + pub skip_header_late_in_slot: bool, +} diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index ce3f0306..80f68a18 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -14,3 +14,13 @@ pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS"; pub const BUILDER_EVENTS_PATH: &str = "/events"; pub const DEFAULT_PBS_JWT_KEY: &str = "DEFAULT_PBS"; + +#[non_exhaustive] +pub struct DefaultTimeout; +impl DefaultTimeout { + pub const GET_HEADER_MS: u64 = 950; + pub const GET_PAYLOAD_MS: u64 = 4000; + pub const REGISTER_VALIDATOR_MS: u64 = 3000; +} + +pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; diff --git a/crates/common/src/pbs/mod.rs b/crates/common/src/pbs/mod.rs index 174564b0..d4b0a907 100644 --- a/crates/common/src/pbs/mod.rs +++ b/crates/common/src/pbs/mod.rs @@ -1,5 +1,7 @@ +mod config; mod constants; mod types; +pub use config::*; pub use constants::*; -pub use types::RelayEntry; +pub use types::*; diff --git a/crates/common/src/pbs/types.rs b/crates/common/src/pbs/types.rs index b41c59aa..f770afd3 100644 --- a/crates/common/src/pbs/types.rs +++ b/crates/common/src/pbs/types.rs @@ -1,24 +1,99 @@ +use std::{str::FromStr, sync::Arc}; + use alloy::{ primitives::{hex::FromHex, B256}, rpc::types::beacon::BlsPublicKey, }; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use url::Url; -use super::constants::{ - BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, +use super::{ + constants::{BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH}, + RelayConfig, HEADER_VERSION_KEY, HEAVER_VERSION_VALUE, }; - +use crate::DEFAULT_REQUEST_TIMEOUT; +/// A parsed entry of the relay url in the format: scheme://pubkey@host #[derive(Debug, Default, Clone)] pub struct RelayEntry { + /// Default if of the relay, the hostname of the url pub id: String, + /// Public key of the relay pub pubkey: BlsPublicKey, + /// Full url of the relay pub url: String, } -impl RelayEntry { - fn get_url(&self, path: &str) -> String { - format!("{}{path}", &self.url) +impl Serialize for RelayEntry { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.url) + } +} + +impl<'de> Deserialize<'de> for RelayEntry { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let str = String::deserialize(deserializer)?; + let url = Url::parse(&str).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; + let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); + + Ok(RelayEntry { pubkey, url: str, id }) + } +} + +/// A client to interact with a relay, safe to share across threads +#[derive(Debug, Clone)] +pub struct RelayClient { + /// ID of the relay + pub id: Arc, + /// HTTP client to send requests + pub client: reqwest::Client, + /// Configuration of the relay + pub config: Arc, +} + +impl RelayClient { + pub fn new(config: RelayConfig) -> Self { + let mut headers = HeaderMap::new(); + headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEAVER_VERSION_VALUE)); + + if let Some(custom_headers) = &config.headers { + for (key, value) in custom_headers { + headers.insert( + HeaderName::from_str(key) + .unwrap_or_else(|_| panic!("{key} is an invalid header name")), + HeaderValue::from_str(value) + .unwrap_or_else(|_| panic!("{key} has an invalid header value")), + ); + } + } + + let client = reqwest::Client::builder() + .default_headers(headers) + .timeout(DEFAULT_REQUEST_TIMEOUT) + .build() + .expect("failed to build relay client"); + + Self { + id: Arc::new(config.id.clone().unwrap_or(config.entry.id.clone())), + client, + config: Arc::new(config), + } + } + + pub fn pubkey(&self) -> BlsPublicKey { + self.config.entry.pubkey + } + + // URL builders + pub fn get_url(&self, path: &str) -> String { + format!("{}{path}", &self.config.entry.url) } pub fn get_header_url( @@ -43,29 +118,6 @@ impl RelayEntry { } } -impl Serialize for RelayEntry { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.url) - } -} - -impl<'de> Deserialize<'de> for RelayEntry { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let str = String::deserialize(deserializer)?; - let url = Url::parse(&str).map_err(serde::de::Error::custom)?; - let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; - let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); - - Ok(RelayEntry { pubkey, url: str, id }) - } -} - #[cfg(test)] mod tests { use alloy::{primitives::hex::FromHex, rpc::types::beacon::BlsPublicKey}; diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 043782ab..6767164b 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -7,7 +7,7 @@ use crate::constants::{ RHEA_BUILDER_DOMAIN, RHEA_FORK_VERSION, RHEA_GENESIS_TIME_SECONDS, }; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum Chain { Mainnet, Holesky, diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 1ffb70d2..c0b779b8 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -16,7 +16,6 @@ const MILLIS_PER_SECOND: u64 = 1_000; pub fn timestamp_of_slot_start_millis(slot: u64, chain: Chain) -> u64 { let seconds_since_genesis = chain.genesis_time_sec() + slot * SECONDS_PER_SLOT; - seconds_since_genesis * MILLIS_PER_SECOND } @@ -94,6 +93,18 @@ pub mod as_eth_str { } } +pub const fn default_u64() -> u64 { + U +} + +pub const fn default_bool() -> bool { + U +} + +pub const fn default_u256() -> U256 { + U256::ZERO +} + // LOGGING // TODO: more customized logging + logging guard pub fn initialize_tracing_log() { diff --git a/crates/pbs/src/constants.rs b/crates/pbs/src/constants.rs index 050be538..d0d4d8d6 100644 --- a/crates/pbs/src/constants.rs +++ b/crates/pbs/src/constants.rs @@ -2,3 +2,7 @@ pub(crate) const STATUS_ENDPOINT_TAG: &str = "status"; pub(crate) const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator"; pub(crate) const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block"; pub(crate) const GET_HEADER_ENDPOINT_TAG: &str = "get_header"; + +/// For metrics recorded when a request times out +pub(crate) const TIMEOUT_ERROR_CODE: u16 = 555; +pub(crate) const TIMEOUT_ERROR_CODE_STR: &str = "555"; diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 94de345b..02741191 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -1,4 +1,4 @@ -use std::{ops::Mul, sync::Arc, time::Duration}; +use std::time::{Duration, Instant}; use alloy::{ primitives::{utils::format_ether, B256, U256}, @@ -6,18 +6,17 @@ use alloy::{ }; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - config::PbsConfig, - pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, + pbs::{PbsConfig, RelayClient, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, signature::verify_signed_builder_message, types::Chain, utils::{get_user_agent, utcnow_ms}, }; use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::{ - constants::GET_HEADER_ENDPOINT_TAG, + constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR}, error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -32,14 +31,11 @@ pub async fn get_header( req_headers: HeaderMap, state: PbsState, ) -> eyre::Result> { - let GetHeaderParams { slot, parent_hash, pubkey: validator_pubkey } = params; let (_, slot_uuid) = state.get_slot_and_uuid(); - // prepare headers + // prepare headers, except for start time which is set in `send_one_get_header` let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); - send_headers - .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -47,61 +43,143 @@ pub async fn get_header( let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { - handles.push(send_get_header( - send_headers.clone(), - slot, - parent_hash, - validator_pubkey, - relay.clone(), + handles.push(send_timed_get_header( + params, + relay, state.config.chain, - state.config.pbs_config.clone(), - state.relay_client(), + state.pbs_config(), + send_headers.clone(), )); } let results = join_all(handles).await; let mut relay_bids = Vec::with_capacity(relays.len()); for (i, res) in results.into_iter().enumerate() { - let relay_id = relays[i].id.clone(); + let relay_id = relays[i].id.as_ref(); match res { Ok(Some(res)) => relay_bids.push(res), Ok(_) => {} - Err(err) => match err { - PbsError::Reqwest(req_err) if req_err.is_timeout() => { - error!(err = "Timed Out", relay_id) - } - - _ => error!(?err, relay_id), - }, + Err(PbsError::Reqwest(req_err)) if req_err.is_timeout() => { + error!(err = "Timed Out", relay_id) + } + Err(err) => error!(?err, relay_id), } } - Ok(state.add_bids(slot, relay_bids)) + Ok(state.add_bids(params.slot, relay_bids)) } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] -async fn send_get_header( - headers: HeaderMap, - slot: u64, - parent_hash: B256, - validator_pubkey: BlsPublicKey, - relay: RelayEntry, +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] +async fn send_timed_get_header( + params: GetHeaderParams, + relay: &RelayClient, chain: Chain, - config: Arc, - client: reqwest::Client, + pbs_config: &PbsConfig, + headers: HeaderMap, ) -> Result, PbsError> { - let url = relay.get_header_url(slot, parent_hash, validator_pubkey); + let url = relay.get_header_url(params.slot, params.parent_hash, params.pubkey); + + if relay.config.enable_timing_games { + if let Some(delay) = relay.config.wait_first_header_ms { + debug!(delay, "TG: waiting to send first header request"); + tokio::time::sleep(Duration::from_millis(delay)).await; + } - let timer = - RELAY_LATENCY.with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]).start_timer(); - let res = client + if let Some(send_freq_ms) = relay.config.frequency_get_header_ms { + let start_time_ms = Instant::now(); + let mut handles = Vec::new(); + let max_timeout_ms = pbs_config + .timeout_get_header_ms + .saturating_sub(relay.config.wait_first_header_ms.unwrap_or_default()); + + debug!(send_freq_ms, max_timeout_ms, "TG: sending multiple header requests"); + + loop { + let timeout_left_ms = + max_timeout_ms.saturating_sub(start_time_ms.elapsed().as_millis() as u64); + + handles.push(send_one_get_header( + url.clone(), + params, + relay, + chain, + pbs_config, + headers.clone(), + Some(timeout_left_ms), + )); + + if timeout_left_ms > send_freq_ms { + // enough time for one more + tokio::time::sleep(Duration::from_millis(send_freq_ms)).await; + } else { + break; + } + } + + let results = join_all(handles).await; + let n_headers = results.iter().filter(|header| header.is_ok()).count(); + + if let Some((_, maybe_header)) = + results.into_iter().filter_map(|r| r.ok()).max_by_key(|(start_time, _)| *start_time) + { + debug!(n_headers, "TG: received headers from relay"); + return Ok(maybe_header) + } else { + // all requests failed (likely all timeout) + warn!("TG: no headers received"); + + return Err(PbsError::RelayResponse { + error_msg: "no headers received".to_string(), + code: TIMEOUT_ERROR_CODE, + }) + } + } + } + + // if no timing games or no repeated send, just send one request + send_one_get_header(url, params, relay, chain, pbs_config, headers, None) + .await + .map(|(_, maybe_header)| maybe_header) +} + +async fn send_one_get_header( + url: String, + params: GetHeaderParams, + relay: &RelayClient, + chain: Chain, + pbs_config: &PbsConfig, + mut headers: HeaderMap, + timeout_ms: Option, +) -> Result<(u64, Option), PbsError> { + // the timestamp in the header is the consensus block time which is fixed, + // use the beginning of the request as proxy to make sure we use only the + // last one received + let start_request_time = utcnow_ms(); + headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); + + let start_request = Instant::now(); + let res = match relay + .client .get(url) - .timeout(Duration::from_millis(config.timeout_get_header_ms)) + .timeout(Duration::from_millis(timeout_ms.unwrap_or(pbs_config.timeout_get_header_ms))) .headers(headers) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[TIMEOUT_ERROR_CODE_STR, GET_HEADER_ENDPOINT_TAG, &relay.id]) + .inc(); + return Err(err.into()); + } + }; + + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); @@ -117,17 +195,17 @@ async fn send_get_header( if code == StatusCode::NO_CONTENT { debug!( ?code, - latency_ms, + ?request_latency, response = ?response_bytes, "no header from relay" ); - return Ok(None) + return Ok((start_request_time, None)) } let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?; debug!( - latency_ms, + ?request_latency, block_hash = %get_header_response.block_hash(), value_eth = format_ether(get_header_response.value()), "received new header" @@ -136,25 +214,25 @@ async fn send_get_header( validate_header( &get_header_response.data, chain, - &relay, - parent_hash, - config.skip_sigverify, - config.min_bid_wei, + relay.pubkey(), + params.parent_hash, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, )?; - Ok(Some(get_header_response)) + Ok((start_request_time, Some(get_header_response))) } fn validate_header( signed_header: &SignedExecutionPayloadHeader, chain: Chain, - relay: &RelayEntry, + expected_relay_pubkey: BlsPublicKey, parent_hash: B256, skip_sig_verify: bool, minimum_bid_wei: U256, ) -> Result<(), ValidationError> { let block_hash = signed_header.message.header.block_hash; - let relay_pubkey = signed_header.message.pubkey; + let received_relay_pubkey = signed_header.message.pubkey; let tx_root = signed_header.message.header.transactions_root; let value = signed_header.message.value(); @@ -177,14 +255,17 @@ fn validate_header( return Err(ValidationError::BidTooLow { min: minimum_bid_wei, got: value }); } - if relay.pubkey != relay_pubkey { - return Err(ValidationError::PubkeyMismatch { expected: relay.pubkey, got: relay_pubkey }) + if expected_relay_pubkey != received_relay_pubkey { + return Err(ValidationError::PubkeyMismatch { + expected: expected_relay_pubkey, + got: received_relay_pubkey, + }) } if !skip_sig_verify { verify_signed_builder_message( chain, - &relay_pubkey, + &received_relay_pubkey, &signed_header.message, &signed_header.signature, ) @@ -201,7 +282,7 @@ mod tests { rpc::types::beacon::BlsPublicKey, }; use blst::min_pk; - use cb_common::{pbs::RelayEntry, signature::sign_builder_message, types::Chain}; + use cb_common::{signature::sign_builder_message, types::Chain}; use super::validate_header; use crate::{ @@ -212,23 +293,44 @@ mod tests { #[test] fn test_validate_header() { let mut mock_header = SignedExecutionPayloadHeader::default(); - let mut mock_relay = RelayEntry::default(); + let parent_hash = B256::from_slice(&[1; 32]); let chain = Chain::Holesky; let min_bid = U256::ZERO; + let secret_key = min_pk::SecretKey::from_bytes(&[ + 0, 136, 227, 100, 165, 57, 106, 129, 181, 15, 235, 189, 200, 120, 70, 99, 251, 144, + 137, 181, 230, 124, 189, 193, 115, 153, 26, 0, 197, 135, 103, 63, + ]) + .unwrap(); + let pubkey = BlsPublicKey::from_slice(&secret_key.sk_to_pk().to_bytes()); + mock_header.message.header.transactions_root = alloy::primitives::FixedBytes(EMPTY_TX_ROOT_HASH); assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::EmptyBlockhash) ); mock_header.message.header.block_hash.0[1] = 1; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::ParentHashMismatch { expected: parent_hash, got: B256::default() @@ -238,46 +340,55 @@ mod tests { mock_header.message.header.parent_hash = parent_hash; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::EmptyTxRoot) ); mock_header.message.header.transactions_root = Default::default(); assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::BidTooLow { min: min_bid, got: U256::ZERO }) ); mock_header.message.set_value(U256::from(1)); - let secret_key = min_pk::SecretKey::from_bytes(&[ - 0, 136, 227, 100, 165, 57, 106, 129, 181, 15, 235, 189, 200, 120, 70, 99, 251, 144, - 137, 181, 230, 124, 189, 193, 115, 153, 26, 0, 197, 135, 103, 63, - ]) - .unwrap(); - let pubkey = BlsPublicKey::from_slice(&secret_key.sk_to_pk().to_bytes()); mock_header.message.pubkey = pubkey; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::PubkeyMismatch { expected: BlsPublicKey::default(), got: pubkey }) ); - mock_relay.pubkey = pubkey; - assert!(matches!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header(&mock_header, chain, pubkey, parent_hash, false, min_bid), Err(ValidationError::Sigverify(_)) )); - assert!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, true, min_bid).is_ok() - ); + assert!(validate_header(&mock_header, chain, pubkey, parent_hash, true, min_bid).is_ok()); mock_header.signature = sign_builder_message(chain, &secret_key, &mock_header.message); - assert!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid).is_ok() - ) + assert!(validate_header(&mock_header, chain, pubkey, parent_hash, false, min_bid).is_ok()) } } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 98e63fbb..db832c89 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,9 +1,9 @@ -use std::{ops::Mul, time::Duration}; +use std::time::{Duration, Instant}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - pbs::{RelayEntry, HEADER_START_TIME_UNIX_MS}, + pbs::{RelayClient, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent, utcnow_ms}, }; use eyre::bail; @@ -12,7 +12,7 @@ use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ - constants::REGISTER_VALIDATOR_ENDPOINT_TAG, + constants::{REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -37,11 +37,10 @@ pub async fn register_validator( let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(send_register_validator( - send_headers.clone(), - relay.clone(), registrations.clone(), - state.config.pbs_config.timeout_register_validator_ms, - state.relay_client(), + relay, + send_headers.clone(), + state.pbs_config().timeout_register_validator_ms, )); } @@ -54,27 +53,41 @@ pub async fn register_validator( } } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_register_validator( - headers: HeaderMap, - relay: RelayEntry, registrations: Vec, + relay: &RelayClient, + headers: HeaderMap, timeout_ms: u64, - client: reqwest::Client, ) -> Result<(), PbsError> { let url = relay.register_validator_url(); - let timer = RELAY_LATENCY - .with_label_values(&[REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) - .start_timer(); - let res = client + let start_request = Instant::now(); + let res = match relay + .client .post(url) .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .json(®istrations) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + REGISTER_VALIDATOR_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()); + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE @@ -93,7 +106,7 @@ async fn send_register_validator( return Err(err); }; - debug!(?code, latency_ms, "registration successful"); + debug!(?code, ?request_latency, "registration successful"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 5f71359d..b710a529 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -1,13 +1,13 @@ -use std::{ops::Mul, time::Duration}; +use std::time::{Duration, Instant}; use axum::http::{HeaderMap, HeaderValue}; -use cb_common::{pbs::RelayEntry, utils::get_user_agent}; +use cb_common::{pbs::RelayClient, utils::get_user_agent}; use futures::future::select_ok; use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ - constants::STATUS_ENDPOINT_TAG, + constants::{STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -33,11 +33,7 @@ pub async fn get_status( let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { - handles.push(Box::pin(send_relay_check( - send_headers.clone(), - relay.clone(), - state.relay_client(), - ))); + handles.push(Box::pin(send_relay_check(relay, send_headers.clone()))); } // return ok if at least one relay returns 200 @@ -49,17 +45,31 @@ pub async fn get_status( } } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] -async fn send_relay_check( - headers: HeaderMap, - relay: RelayEntry, - client: reqwest::Client, -) -> Result<(), PbsError> { +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] +async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(), PbsError> { let url = relay.get_status_url(); - let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer(); - let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + let start_request = Instant::now(); + let res = match relay + .client + .get(url) + .timeout(Duration::from_secs(30)) + .headers(headers) + .send() + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[TIMEOUT_ERROR_CODE_STR, STATUS_ENDPOINT_TAG, &relay.id]) + .inc(); + return Err(err.into()) + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc(); @@ -75,7 +85,7 @@ async fn send_relay_check( return Err(err) }; - debug!(?code, latency_ms, "status passed"); + debug!(?code, ?request_latency, "status passed"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 2fb05609..84bc5830 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,9 +1,8 @@ -use std::{ops::Mul, sync::Arc, time::Duration}; +use std::time::{Duration, Instant}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - config::PbsConfig, - pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, + pbs::{RelayClient, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent, utcnow_ms}, }; use futures::future::select_ok; @@ -11,7 +10,7 @@ use reqwest::header::USER_AGENT; use tracing::{debug, warn}; use crate::{ - constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + constants::{SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -29,8 +28,7 @@ pub async fn submit_block( // prepare headers let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); - send_headers - .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); + send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -39,11 +37,10 @@ pub async fn submit_block( let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push(Box::pin(send_submit_block( - send_headers.clone(), - relay.clone(), &signed_blinded_block, - state.config.pbs_config.clone(), - state.relay_client(), + relay, + send_headers.clone(), + state.config.pbs_config.timeout_get_payload_ms, ))); } @@ -56,27 +53,41 @@ pub async fn submit_block( // submits blinded signed block and expects the execution payload + blobs bundle // back -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_submit_block( - headers: HeaderMap, - relay: RelayEntry, signed_blinded_block: &SignedBlindedBeaconBlock, - config: Arc, - client: reqwest::Client, + relay: &RelayClient, + headers: HeaderMap, + timeout_ms: u64, ) -> Result { let url = relay.submit_block_url(); - let timer = RELAY_LATENCY - .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) - .start_timer(); - let res = client + let start_request = Instant::now(); + let res = match relay + .client .post(url) - .timeout(Duration::from_millis(config.timeout_get_payload_ms)) + .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .json(&signed_blinded_block) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()) + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE @@ -98,7 +109,7 @@ async fn send_submit_block( let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?; debug!( - latency_ms, + ?request_latency, block_hash = %block_response.block_hash(), "received unblinded block" ); diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index ee707099..717d5879 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -6,7 +6,7 @@ use axum::{ }; use cb_common::utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}; use reqwest::StatusCode; -use tracing::{error, info}; +use tracing::{error, info, warn}; use uuid::Uuid; use crate::{ @@ -27,11 +27,22 @@ pub async fn handle_get_header>( state.publish_event(BuilderEvent::GetHeaderRequest(params)); state.get_or_update_slot_uuid(params.slot); - let now = utcnow_ms(); let slot_start_ms = timestamp_of_slot_start_millis(params.slot, state.config.chain); let ua = get_user_agent(&req_headers); - info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot=now.saturating_sub(slot_start_ms)); + let ms_into_slot = utcnow_ms().saturating_sub(slot_start_ms); + + info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); + if state.is_late_in_slot(ms_into_slot) && state.pbs_config().skip_header_late_in_slot { + // too late into slot, force local build + warn!( + ms_into_slot, + threshold = state.pbs_config().late_in_slot_time_ms, + "late in slot, skipping relay requests" + ); + BEACON_NODE_STATUS.with_label_values(&["204", GET_HEADER_ENDPOINT_TAG]).inc(); + return Ok(StatusCode::NO_CONTENT.into_response()); + } match T::get_header(params, req_headers, state.clone()).await { Ok(res) => { diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 4c7c17fe..49360826 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -51,8 +51,8 @@ pub async fn handle_submit_block>( let fault_relays = state .relays() .iter() - .filter(|relay| fault_pubkeys.contains(&relay.pubkey)) - .map(|relay| relay.id.clone()) + .filter(|relay| fault_pubkeys.contains(&relay.pubkey())) + .map(|relay| &**relay.id) .collect::>() .join(","); diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index c270ace4..870d1f1c 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -1,15 +1,13 @@ use std::{ collections::HashSet, - str::FromStr, + fmt, sync::{Arc, Mutex}, }; use alloy::{primitives::B256, rpc::types::beacon::BlsPublicKey}; -use axum::http::{HeaderMap, HeaderName, HeaderValue}; use cb_common::{ - config::{PbsConfig, PbsModuleConfig}, - pbs::{RelayEntry, HEADER_VERSION_KEY, HEAVER_VERSION_VALUE}, - DEFAULT_REQUEST_TIMEOUT, + config::PbsModuleConfig, + pbs::{PbsConfig, RelayClient}, }; use dashmap::DashMap; use tokio::sync::broadcast; @@ -17,7 +15,7 @@ use uuid::Uuid; use crate::{types::GetHeaderReponse, BuilderEvent}; -pub trait BuilderApiState: std::fmt::Debug + Default + Clone + Sync + Send + 'static {} +pub trait BuilderApiState: fmt::Debug + Default + Clone + Sync + Send + 'static {} impl BuilderApiState for () {} pub type BuilderEventReceiver = broadcast::Receiver; @@ -28,11 +26,9 @@ pub type BuilderEventReceiver = broadcast::Receiver; #[derive(Debug, Clone)] pub struct PbsState { /// Config data for the Pbs service - pub config: Arc>, + pub config: PbsModuleConfig, /// Opaque extra data for library use pub data: S, - /// Relay client to reuse across requests - relay_client: reqwest::Client, /// Pubsliher for builder events event_publisher: broadcast::Sender, /// Info about the latest slot and its uuid @@ -48,28 +44,9 @@ where pub fn new(config: PbsModuleConfig) -> Self { let (tx, _) = broadcast::channel(10); - let mut headers = HeaderMap::new(); - headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEAVER_VERSION_VALUE)); - - if let Some(custom_headers) = &config.pbs_config.headers { - for (key, value) in custom_headers.iter() { - headers.insert( - HeaderName::from_str(key).expect("invalid header name"), - HeaderValue::from_str(&value).expect("invalid header value"), - ); - } - } - - let relay_client = reqwest::Client::builder() - .default_headers(headers) - .timeout(DEFAULT_REQUEST_TIMEOUT) - .build() - .expect("failed to build relay client"); - Self { - config: Arc::new(config), + config, data: S::default(), - relay_client, event_publisher: tx, current_slot_info: Arc::new(Mutex::new((0, Uuid::default()))), bid_cache: Arc::new(DashMap::new()), @@ -105,21 +82,20 @@ where *guard } + pub fn is_late_in_slot(&self, ms_into_slot: u64) -> bool { + ms_into_slot > self.config.pbs_config.late_in_slot_time_ms + } + // Getters pub fn pbs_config(&self) -> &PbsConfig { &self.config.pbs_config } - pub fn relays(&self) -> &[RelayEntry] { - &self.pbs_config().relays - } - pub fn relay_client(&self) -> reqwest::Client { - self.relay_client.clone() + pub fn relays(&self) -> &[RelayClient] { + &self.config.relays } /// Add some bids to the cache, the bids are all assumed to be for the /// provided slot Returns the bid with the max value - /// TODO: this doesnt handle cancellations if we call multiple times - /// get_header pub fn add_bids(&self, slot: u64, bids: Vec) -> Option { let mut slot_entry = self.bid_cache.entry(slot).or_default(); slot_entry.extend(bids); diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 75e78369..67d332d2 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -1,33 +1,25 @@ -use std::net::SocketAddr; - -use alloy::primitives::B256; -use alloy::rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}; -use cb_common::pbs::RelayEntry; +use alloy::{ + primitives::B256, + rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}, +}; +use cb_common::pbs::RelayClient; use cb_pbs::{GetHeaderReponse, SignedBlindedBeaconBlock}; use reqwest::Error; +use crate::utils::generate_mock_relay; + pub struct MockValidator { - comm_boost: RelayEntry, - client: reqwest::Client, + comm_boost: RelayClient, } impl MockValidator { - pub fn new(address: SocketAddr) -> Self { - let client = reqwest::Client::new(); - - Self { - comm_boost: RelayEntry { - id: "".to_owned(), - pubkey: BlsPublicKey::ZERO, - url: format!("http://{address}"), - }, - client, - } + pub fn new(port: u16) -> Self { + Self { comm_boost: generate_mock_relay(port, BlsPublicKey::default()) } } pub async fn do_get_header(&self) -> Result<(), Error> { let url = self.comm_boost.get_header_url(0, B256::ZERO, BlsPublicKey::ZERO); - let res = self.client.get(url).send().await?.bytes().await?; + let res = self.comm_boost.client.get(url).send().await?.bytes().await?; assert!(serde_json::from_slice::(&res).is_ok()); Ok(()) @@ -35,7 +27,7 @@ impl MockValidator { pub async fn do_get_status(&self) -> Result<(), Error> { let url = self.comm_boost.get_status_url(); - let _res = self.client.get(url).send().await?; + let _res = self.comm_boost.client.get(url).send().await?; // assert!(res.status().is_success()); Ok(()) @@ -46,7 +38,8 @@ impl MockValidator { let registration: Vec = vec![]; - self.client + self.comm_boost + .client .post(url) .header("Content-Type", "application/json") .body(serde_json::to_string(®istration).unwrap()) @@ -62,7 +55,8 @@ impl MockValidator { let signed_blinded_block = SignedBlindedBeaconBlock::default(); - self.client + self.comm_boost + .client .post(url) .header("Content-Type", "application/json") .body(serde_json::to_string(&signed_blinded_block).unwrap()) diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 7b04140c..b1975bac 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -1,5 +1,8 @@ use std::sync::Once; +use alloy::rpc::types::beacon::BlsPublicKey; +use cb_common::pbs::{RelayClient, RelayConfig, RelayEntry}; + pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") } @@ -10,3 +13,9 @@ pub fn setup_test_env() { tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); }); } + +pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> RelayClient { + let entry = RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port) }; + let config = RelayConfig { entry, ..RelayConfig::default() }; + RelayClient::new(config) +} diff --git a/tests/tests/config.rs b/tests/tests/config.rs new file mode 100644 index 00000000..090ff103 --- /dev/null +++ b/tests/tests/config.rs @@ -0,0 +1,10 @@ +use cb_common::{config::CommitBoostConfig, types::Chain}; + +#[tokio::test] +async fn test_load_config() { + let config = CommitBoostConfig::from_file("../config.example.toml"); + + assert_eq!(config.chain, Chain::Holesky); + assert!(config.relays[0].headers.is_some()) + // TODO: add more +} diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 329899ed..86f818d6 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -1,10 +1,9 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration, u64}; use alloy::primitives::U256; -use alloy::rpc::types::beacon::BlsPublicKey; use cb_common::{ - config::{PbsConfig, PbsModuleConfig}, - pbs::RelayEntry, + config::PbsModuleConfig, + pbs::{PbsConfig, RelayClient}, signer::Signer, types::Chain, }; @@ -12,15 +11,11 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{mock_relay_app_router, MockRelayState}, mock_validator::MockValidator, - utils::{get_local_address, setup_test_env}, + utils::{generate_mock_relay, setup_test_env}, }; use tokio::net::TcpListener; use tracing::info; -fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> RelayEntry { - RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port) } -} - async fn start_mock_relay_service(state: Arc, port: u16) { let app = mock_relay_app_router(state); @@ -31,22 +26,32 @@ async fn start_mock_relay_service(state: Arc, port: u16) { axum::serve(listener, app).await.unwrap(); } -fn get_pbs_static_config(port: u16, relays: Vec) -> PbsConfig { +fn get_pbs_static_config(port: u16) -> PbsConfig { PbsConfig { port, - relays, relay_check: true, timeout_get_header_ms: u64::MAX, timeout_get_payload_ms: u64::MAX, timeout_register_validator_ms: u64::MAX, skip_sigverify: false, min_bid_wei: U256::ZERO, - headers: None, + late_in_slot_time_ms: u64::MAX, + skip_header_late_in_slot: false, } } -fn to_pbs_config(chain: Chain, pbs_config: PbsConfig) -> PbsModuleConfig<()> { - PbsModuleConfig { chain, pbs_config: Arc::new(pbs_config), signer_client: None, extra: () } +fn to_pbs_config( + chain: Chain, + pbs_config: PbsConfig, + relays: Vec, +) -> PbsModuleConfig<()> { + PbsModuleConfig { + chain, + pbs_config: Arc::new(pbs_config), + signer_client: None, + extra: (), + relays, + } } #[tokio::test] @@ -61,15 +66,14 @@ async fn test_get_header() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, vec![mock_relay])); + let config = to_pbs_config(chain, get_pbs_static_config(port), vec![mock_relay]); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending get header"); let res = mock_validator.do_get_header().await; @@ -93,15 +97,14 @@ async fn test_get_status() { tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending get status"); let res = mock_validator.do_get_status().await; @@ -121,15 +124,14 @@ async fn test_register_validators() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending register validator"); let res = mock_validator.do_register_validator().await; @@ -149,15 +151,14 @@ async fn test_submit_block() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending submit block"); let res = mock_validator.do_submit_block().await; From d22b1ab6af69c2a32dedf50ff748c015d4159341 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Tue, 30 Jul 2024 11:32:25 +0100 Subject: [PATCH 3/5] switch to time in slot timeouts --- config.example.toml | 4 +- crates/common/src/pbs/config.rs | 4 +- crates/common/src/utils.rs | 4 + crates/pbs/src/error.rs | 6 + crates/pbs/src/mev_boost/get_header.rs | 115 ++++++++++++------ .../pbs/src/mev_boost/register_validator.rs | 2 +- crates/pbs/src/mev_boost/status.rs | 2 +- crates/pbs/src/mev_boost/submit_block.rs | 2 +- crates/pbs/src/routes/get_header.rs | 6 +- crates/pbs/src/routes/router.rs | 25 +--- 10 files changed, 96 insertions(+), 74 deletions(-) diff --git a/config.example.toml b/config.example.toml index 79a1b57d..afcc22aa 100644 --- a/config.example.toml +++ b/config.example.toml @@ -17,8 +17,8 @@ id = "example-relay" url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz" headers = { X-MyCustomHeader = "MyCustomValue" } enable_timing_games = false -wait_first_header_ms = 250 -frequency_get_header_ms = 100 +target_first_request_ms = 200 +frequency_get_header_ms = 300 [signer] [signer.loader] diff --git a/crates/common/src/pbs/config.rs b/crates/common/src/pbs/config.rs index 853b00da..c63f84fc 100644 --- a/crates/common/src/pbs/config.rs +++ b/crates/common/src/pbs/config.rs @@ -22,8 +22,8 @@ pub struct RelayConfig { pub headers: Option>, /// Whether to enable timing games pub enable_timing_games: bool, - /// Delay in ms to wait before sending the first get_header - pub wait_first_header_ms: Option, + /// Target time in slot when to send the first header request + pub target_first_request_ms: Option, /// Frequency in ms to send get_header requests pub frequency_get_header_ms: Option, } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index c0b779b8..6cb17cfc 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -18,6 +18,10 @@ pub fn timestamp_of_slot_start_millis(slot: u64, chain: Chain) -> u64 { let seconds_since_genesis = chain.genesis_time_sec() + slot * SECONDS_PER_SLOT; seconds_since_genesis * MILLIS_PER_SECOND } +pub fn ms_into_slot(slot: u64, chain: Chain) -> u64 { + let slot_start_ms = timestamp_of_slot_start_millis(slot, chain); + utcnow_ms().saturating_sub(slot_start_ms) +} /// Seconds pub fn utcnow_sec() -> u64 { diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 3b79a922..79216676 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -50,6 +50,12 @@ pub enum PbsError { Validation(#[from] ValidationError), } +impl PbsError { + pub fn is_timeout(&self) -> bool { + matches!(self, PbsError::Reqwest(err) if err.is_timeout()) + } +} + #[derive(Debug, Error, PartialEq, Eq)] pub enum ValidationError { #[error("empty blockhash")] diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 02741191..287071e3 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -9,10 +9,11 @@ use cb_common::{ pbs::{PbsConfig, RelayClient, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, signature::verify_signed_builder_message, types::Chain, - utils::{get_user_agent, utcnow_ms}, + utils::{get_user_agent, ms_into_slot, utcnow_ms}, }; use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; +use tokio::time::sleep; use tracing::{debug, error, warn}; use crate::{ @@ -33,6 +34,12 @@ pub async fn get_header( ) -> eyre::Result> { let (_, slot_uuid) = state.get_slot_and_uuid(); + let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let max_timeout_ms = state + .pbs_config() + .timeout_get_header_ms + .min(state.pbs_config().late_in_slot_time_ms.saturating_sub(ms_into_slot)); + // prepare headers, except for start time which is set in `send_one_get_header` let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); @@ -45,10 +52,12 @@ pub async fn get_header( for relay in relays.iter() { handles.push(send_timed_get_header( params, - relay, + relay.clone(), state.config.chain, state.pbs_config(), send_headers.clone(), + ms_into_slot, + max_timeout_ms, )); } @@ -60,9 +69,7 @@ pub async fn get_header( match res { Ok(Some(res)) => relay_bids.push(res), Ok(_) => {} - Err(PbsError::Reqwest(req_err)) if req_err.is_timeout() => { - error!(err = "Timed Out", relay_id) - } + Err(err) if err.is_timeout() => error!(err = "Timed Out", relay_id), Err(err) => error!(?err, relay_id), } } @@ -73,60 +80,80 @@ pub async fn get_header( #[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_timed_get_header( params: GetHeaderParams, - relay: &RelayClient, + relay: RelayClient, chain: Chain, pbs_config: &PbsConfig, headers: HeaderMap, + ms_into_slot: u64, + mut timeout_left_ms: u64, ) -> Result, PbsError> { let url = relay.get_header_url(params.slot, params.parent_hash, params.pubkey); if relay.config.enable_timing_games { - if let Some(delay) = relay.config.wait_first_header_ms { - debug!(delay, "TG: waiting to send first header request"); - tokio::time::sleep(Duration::from_millis(delay)).await; + if let Some(target_ms) = relay.config.target_first_request_ms { + // sleep until target time in slot + + let delay = target_ms.saturating_sub(ms_into_slot); + if delay > 0 { + debug!(target_ms, ms_into_slot, "TG: waiting to send first header request"); + timeout_left_ms = timeout_left_ms.saturating_sub(delay); + sleep(Duration::from_millis(delay)).await; + } else { + debug!(target_ms, ms_into_slot, "TG: request already late enough in slot"); + } } if let Some(send_freq_ms) = relay.config.frequency_get_header_ms { - let start_time_ms = Instant::now(); let mut handles = Vec::new(); - let max_timeout_ms = pbs_config - .timeout_get_header_ms - .saturating_sub(relay.config.wait_first_header_ms.unwrap_or_default()); - debug!(send_freq_ms, max_timeout_ms, "TG: sending multiple header requests"); + debug!(send_freq_ms, timeout_left_ms, "TG: sending multiple header requests"); loop { - let timeout_left_ms = - max_timeout_ms.saturating_sub(start_time_ms.elapsed().as_millis() as u64); - - handles.push(send_one_get_header( + handles.push(tokio::spawn(send_one_get_header( url.clone(), params, - relay, + relay.clone(), chain, - pbs_config, headers.clone(), - Some(timeout_left_ms), - )); + timeout_left_ms, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, + ))); if timeout_left_ms > send_freq_ms { // enough time for one more - tokio::time::sleep(Duration::from_millis(send_freq_ms)).await; + timeout_left_ms = timeout_left_ms.saturating_sub(send_freq_ms); + sleep(Duration::from_millis(send_freq_ms)).await; } else { break; } } let results = join_all(handles).await; - let n_headers = results.iter().filter(|header| header.is_ok()).count(); - - if let Some((_, maybe_header)) = - results.into_iter().filter_map(|r| r.ok()).max_by_key(|(start_time, _)| *start_time) + let mut n_headers = 0; + + if let Some((_, maybe_header)) = results + .into_iter() + .filter_map(|res| { + // ignore join error and timeouts, log other errors + res.ok().and_then(|inner_res| match inner_res { + Ok(maybe_header) => { + n_headers += 1; + Some(maybe_header) + } + Err(err) if err.is_timeout() => None, + Err(err) => { + error!(?err, "TG: error sending header request"); + None + } + }) + }) + .max_by_key(|(start_time, _)| *start_time) { debug!(n_headers, "TG: received headers from relay"); return Ok(maybe_header) } else { - // all requests failed (likely all timeout) + // all requests failed warn!("TG: no headers received"); return Err(PbsError::RelayResponse { @@ -138,19 +165,29 @@ async fn send_timed_get_header( } // if no timing games or no repeated send, just send one request - send_one_get_header(url, params, relay, chain, pbs_config, headers, None) - .await - .map(|(_, maybe_header)| maybe_header) + send_one_get_header( + url, + params, + relay, + chain, + headers, + timeout_left_ms, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, + ) + .await + .map(|(_, maybe_header)| maybe_header) } async fn send_one_get_header( url: String, params: GetHeaderParams, - relay: &RelayClient, + relay: RelayClient, chain: Chain, - pbs_config: &PbsConfig, mut headers: HeaderMap, - timeout_ms: Option, + timeout_ms: u64, + skip_sigverify: bool, + min_bid_wei: U256, ) -> Result<(u64, Option), PbsError> { // the timestamp in the header is the consensus block time which is fixed, // use the beginning of the request as proxy to make sure we use only the @@ -162,7 +199,7 @@ async fn send_one_get_header( let res = match relay .client .get(url) - .timeout(Duration::from_millis(timeout_ms.unwrap_or(pbs_config.timeout_get_header_ms))) + .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .send() .await @@ -195,7 +232,7 @@ async fn send_one_get_header( if code == StatusCode::NO_CONTENT { debug!( ?code, - ?request_latency, + latency = ?request_latency, response = ?response_bytes, "no header from relay" ); @@ -205,7 +242,7 @@ async fn send_one_get_header( let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?; debug!( - ?request_latency, + latency = ?request_latency, block_hash = %get_header_response.block_hash(), value_eth = format_ether(get_header_response.value()), "received new header" @@ -216,8 +253,8 @@ async fn send_one_get_header( chain, relay.pubkey(), params.parent_hash, - pbs_config.skip_sigverify, - pbs_config.min_bid_wei, + skip_sigverify, + min_bid_wei, )?; Ok((start_request_time, Some(get_header_response))) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index db832c89..7b8c3d93 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -106,7 +106,7 @@ async fn send_register_validator( return Err(err); }; - debug!(?code, ?request_latency, "registration successful"); + debug!(?code, latency = ?request_latency, "registration successful"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index b710a529..463198c7 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -85,7 +85,7 @@ async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(), return Err(err) }; - debug!(?code, ?request_latency, "status passed"); + debug!(?code, latency = ?request_latency, "status passed"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 84bc5830..853eccb6 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -109,7 +109,7 @@ async fn send_submit_block( let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?; debug!( - ?request_latency, + latency = ?request_latency, block_hash = %block_response.block_hash(), "received unblinded block" ); diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 717d5879..8dcb3bc5 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -4,7 +4,7 @@ use axum::{ http::HeaderMap, response::IntoResponse, }; -use cb_common::utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}; +use cb_common::utils::{get_user_agent, ms_into_slot}; use reqwest::StatusCode; use tracing::{error, info, warn}; use uuid::Uuid; @@ -27,10 +27,8 @@ pub async fn handle_get_header>( state.publish_event(BuilderEvent::GetHeaderRequest(params)); state.get_or_update_slot_uuid(params.slot); - let slot_start_ms = timestamp_of_slot_start_millis(params.slot, state.config.chain); let ua = get_user_agent(&req_headers); - - let ms_into_slot = utcnow_ms().saturating_sub(slot_start_ms); + let ms_into_slot = ms_into_slot(params.slot, state.config.chain); info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); if state.is_late_in_slot(ms_into_slot) && state.pbs_config().skip_header_late_in_slot { diff --git a/crates/pbs/src/routes/router.rs b/crates/pbs/src/routes/router.rs index df67ed4f..13af21c6 100644 --- a/crates/pbs/src/routes/router.rs +++ b/crates/pbs/src/routes/router.rs @@ -1,14 +1,10 @@ use axum::{ - extract::Request, - http::{Response, StatusCode}, - middleware::{map_request, map_response}, routing::{get, post}, Router, }; use cb_common::pbs::{ BULDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }; -use tracing::debug; use super::{handle_get_header, handle_get_status, handle_register_validator, handle_submit_block}; use crate::{ @@ -31,24 +27,5 @@ pub fn create_app_router>(state: PbsState StatusCode { - StatusCode::NOT_FOUND -} - -// TODO: remove -async fn log_all_responses(response: Response) -> Response { - debug!("RECEIVED RESPONSE: {response:?}"); - response -} - -// TODO: remove -async fn log_all_requests(request: Request) -> Request { - debug!("SENDING REQUEST: {request:?}"); - request + app.with_state(state) } From 978a7ab058f85362dbbbe137914b57247561870d Mon Sep 17 00:00:00 2001 From: ltitanb Date: Tue, 30 Jul 2024 11:51:27 +0100 Subject: [PATCH 4/5] removed redundant param --- config.example.toml | 1 - crates/common/src/pbs/config.rs | 3 --- crates/pbs/src/mev_boost/get_header.rs | 14 ++++++++++++-- crates/pbs/src/routes/get_header.rs | 12 +----------- crates/pbs/src/state.rs | 4 ---- 5 files changed, 13 insertions(+), 21 deletions(-) diff --git a/config.example.toml b/config.example.toml index afcc22aa..bfd1add3 100644 --- a/config.example.toml +++ b/config.example.toml @@ -10,7 +10,6 @@ skip_sigverify = true min_bid_eth = 0.0 late_in_slot_time_ms = 2000 -skip_header_late_in_slot = true [[relays]] id = "example-relay" diff --git a/crates/common/src/pbs/config.rs b/crates/common/src/pbs/config.rs index c63f84fc..aadbf33c 100644 --- a/crates/common/src/pbs/config.rs +++ b/crates/common/src/pbs/config.rs @@ -52,7 +52,4 @@ pub struct PbsConfig { /// How late in the slot we consider to be "late" #[serde(default = "default_u64::")] pub late_in_slot_time_ms: u64, - /// If it's too late in the slot, skip get header and force local build - #[serde(default = "default_bool::")] - pub skip_header_late_in_slot: bool, } diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 287071e3..e8d2b3cf 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -32,14 +32,24 @@ pub async fn get_header( req_headers: HeaderMap, state: PbsState, ) -> eyre::Result> { - let (_, slot_uuid) = state.get_slot_and_uuid(); - let ms_into_slot = ms_into_slot(params.slot, state.config.chain); let max_timeout_ms = state .pbs_config() .timeout_get_header_ms .min(state.pbs_config().late_in_slot_time_ms.saturating_sub(ms_into_slot)); + if max_timeout_ms == 0 { + warn!( + ms_into_slot, + threshold = state.pbs_config().late_in_slot_time_ms, + "late in slot, skipping relay requests" + ); + + return Ok(None) + } + + let (_, slot_uuid) = state.get_slot_and_uuid(); + // prepare headers, except for start time which is set in `send_one_get_header` let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 8dcb3bc5..1691a6b5 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -6,7 +6,7 @@ use axum::{ }; use cb_common::utils::{get_user_agent, ms_into_slot}; use reqwest::StatusCode; -use tracing::{error, info, warn}; +use tracing::{error, info}; use uuid::Uuid; use crate::{ @@ -31,16 +31,6 @@ pub async fn handle_get_header>( let ms_into_slot = ms_into_slot(params.slot, state.config.chain); info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); - if state.is_late_in_slot(ms_into_slot) && state.pbs_config().skip_header_late_in_slot { - // too late into slot, force local build - warn!( - ms_into_slot, - threshold = state.pbs_config().late_in_slot_time_ms, - "late in slot, skipping relay requests" - ); - BEACON_NODE_STATUS.with_label_values(&["204", GET_HEADER_ENDPOINT_TAG]).inc(); - return Ok(StatusCode::NO_CONTENT.into_response()); - } match T::get_header(params, req_headers, state.clone()).await { Ok(res) => { diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 870d1f1c..9daa1dcb 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -82,10 +82,6 @@ where *guard } - pub fn is_late_in_slot(&self, ms_into_slot: u64) -> bool { - ms_into_slot > self.config.pbs_config.late_in_slot_time_ms - } - // Getters pub fn pbs_config(&self) -> &PbsConfig { &self.config.pbs_config From f82b852ce57391535ebc2019836b3e3982aa4477 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Tue, 30 Jul 2024 13:28:35 +0100 Subject: [PATCH 5/5] fixes --- crates/common/src/pbs/config.rs | 1 + crates/pbs/src/mev_boost/get_header.rs | 25 ++++++++++++++----------- tests/tests/pbs_integration.rs | 1 - 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/crates/common/src/pbs/config.rs b/crates/common/src/pbs/config.rs index aadbf33c..d690f40f 100644 --- a/crates/common/src/pbs/config.rs +++ b/crates/common/src/pbs/config.rs @@ -21,6 +21,7 @@ pub struct RelayConfig { /// Optional headers to send with each request pub headers: Option>, /// Whether to enable timing games + #[serde(default = "default_bool::")] pub enable_timing_games: bool, /// Target time in slot when to send the first header request pub target_first_request_ms: Option, diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index e8d2b3cf..8e711f85 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -14,7 +14,7 @@ use cb_common::{ use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; use tokio::time::sleep; -use tracing::{debug, error, warn}; +use tracing::{debug, error, warn, Instrument}; use crate::{ constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR}, @@ -119,16 +119,19 @@ async fn send_timed_get_header( debug!(send_freq_ms, timeout_left_ms, "TG: sending multiple header requests"); loop { - handles.push(tokio::spawn(send_one_get_header( - url.clone(), - params, - relay.clone(), - chain, - headers.clone(), - timeout_left_ms, - pbs_config.skip_sigverify, - pbs_config.min_bid_wei, - ))); + handles.push(tokio::spawn( + send_one_get_header( + url.clone(), + params, + relay.clone(), + chain, + headers.clone(), + timeout_left_ms, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, + ) + .in_current_span(), + )); if timeout_left_ms > send_freq_ms { // enough time for one more diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 86f818d6..72e4873e 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -36,7 +36,6 @@ fn get_pbs_static_config(port: u16) -> PbsConfig { skip_sigverify: false, min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, - skip_header_late_in_slot: false, } }