diff --git a/Cargo.lock b/Cargo.lock index f3c3e9ec..fcf8d070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3210,10 +3210,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -4109,9 +4111,9 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" @@ -4484,6 +4486,19 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "wasm-streams" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.69" diff --git a/Cargo.toml b/Cargo.toml index c3bfd060..4f874840 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ ethereum_serde_utils = "0.7.0" # networking axum = { version = "0.7.5", features = ["macros"] } axum-extra = { version = "0.9.3", features = ["typed-header"] } -reqwest = { version = "0.12.4", features = ["json"] } +reqwest = { version = "0.12.4", features = ["json", "stream"] } headers = "0.4.0" # async / threads diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 45922a00..8330d03a 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -8,20 +8,20 @@ use crate::error::BlstErrorWrapper; #[derive(Debug, Error)] pub enum PbsError { - #[error("axum error: {0}")] + #[error("axum error: {0:?}")] AxumError(#[from] axum::Error), - #[error("reqwest error: {0}")] + #[error("reqwest error: {0:?}")] Reqwest(#[from] reqwest::Error), - #[error("json decode error: {err}, raw: {raw}")] + #[error("json decode error: {err:?}, raw: {raw}")] JsonDecode { err: serde_json::Error, raw: String }, #[error("relay response error. Code: {code}, err: {error_msg}")] RelayResponse { error_msg: String, code: u16 }, - #[error("response size exceeds max size: max: {max} got: {got}")] - PayloadTooLarge { max: usize, got: usize }, + #[error("response size exceeds max size: max: {max} raw: {raw}")] + PayloadTooLarge { max: usize, raw: String }, #[error("failed validating relay response: {0}")] Validation(#[from] ValidationError), diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index 6802890d..3a0702e1 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -16,8 +16,6 @@ use super::{ }; use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT}; -pub const MAX_SIZE: usize = 10 * 1024 * 1024; - /// A parsed entry of the relay url in the format: scheme://pubkey@host #[derive(Debug, Clone)] pub struct RelayEntry { diff --git a/crates/pbs/src/constants.rs b/crates/pbs/src/constants.rs index d0d4d8d6..54156d1b 100644 --- a/crates/pbs/src/constants.rs +++ b/crates/pbs/src/constants.rs @@ -1,8 +1,17 @@ -pub(crate) const STATUS_ENDPOINT_TAG: &str = "status"; -pub(crate) const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator"; -pub(crate) const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block"; -pub(crate) const GET_HEADER_ENDPOINT_TAG: &str = "get_header"; +pub const STATUS_ENDPOINT_TAG: &str = "status"; +pub const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator"; +pub const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block"; +pub const GET_HEADER_ENDPOINT_TAG: &str = "get_header"; /// For metrics recorded when a request times out -pub(crate) const TIMEOUT_ERROR_CODE: u16 = 555; -pub(crate) const TIMEOUT_ERROR_CODE_STR: &str = "555"; +pub const TIMEOUT_ERROR_CODE: u16 = 555; +pub const TIMEOUT_ERROR_CODE_STR: &str = "555"; + +/// 20 MiB to cover edge cases for heavy blocks and also add a bit of slack for +/// any Ethereum upgrades in the near future +pub const MAX_SIZE_SUBMIT_BLOCK: usize = 20 * 1024 * 1024; + +/// 10 KiB, headers are around 700 bytes + buffer for encoding +pub const MAX_SIZE_GET_HEADER: usize = 10 * 1024; + +pub const MAX_SIZE_DEFAULT: usize = 1024; diff --git a/crates/pbs/src/lib.rs b/crates/pbs/src/lib.rs index e3b219c5..efe76d4e 100644 --- a/crates/pbs/src/lib.rs +++ b/crates/pbs/src/lib.rs @@ -6,8 +6,10 @@ mod mev_boost; mod routes; mod service; mod state; +mod utils; pub use api::*; +pub use constants::*; pub use mev_boost::*; pub use service::PbsService; pub use state::{BuilderApiState, PbsState}; diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 5b07454a..c83e77e4 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -11,7 +11,7 @@ use cb_common::{ pbs::{ error::{PbsError, ValidationError}, GetHeaderParams, GetHeaderResponse, RelayClient, SignedExecutionPayloadHeader, - EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS, MAX_SIZE, + EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS, }, signature::verify_signed_message, types::Chain, @@ -24,9 +24,12 @@ use tracing::{debug, error, warn, Instrument}; use url::Url; use crate::{ - constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR}, + constants::{ + GET_HEADER_ENDPOINT_TAG, MAX_SIZE_GET_HEADER, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR, + }, metrics::{RELAY_HEADER_VALUE, RELAY_LAST_SLOT, RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, + utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/getHeader @@ -247,11 +250,7 @@ async fn send_one_get_header( let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); - let response_bytes = res.bytes().await?; - if response_bytes.len() > MAX_SIZE { - return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() }); - } - + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER).await?; if !code.is_success() { return Err(PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 25d68fa0..6c124693 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS, MAX_SIZE}, + pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent_with_version, utcnow_ms}, }; use eyre::bail; @@ -12,9 +12,10 @@ use reqwest::header::USER_AGENT; use tracing::{debug, error, Instrument}; use crate::{ - constants::{REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, + constants::{MAX_SIZE_DEFAULT, REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, + utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator @@ -103,11 +104,8 @@ async fn send_register_validator( .with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) .inc(); - let response_bytes = res.bytes().await?; - if response_bytes.len() > MAX_SIZE { - return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() }); - } if !code.is_success() { + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?; let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), code: code.as_u16(), diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 29a268ba..5fdf7db5 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -2,7 +2,7 @@ use std::time::{Duration, Instant}; use axum::http::HeaderMap; use cb_common::{ - pbs::{error::PbsError, RelayClient, MAX_SIZE}, + pbs::{error::PbsError, RelayClient}, utils::get_user_agent_with_version, }; use futures::future::select_ok; @@ -10,9 +10,10 @@ use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ - constants::{STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, + constants::{MAX_SIZE_DEFAULT, STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, + utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/status @@ -74,11 +75,8 @@ async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(), let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc(); - let response_bytes = res.bytes().await?; - if response_bytes.len() > MAX_SIZE { - return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() }); - } if !code.is_success() { + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?; let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), code: code.as_u16(), diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 295cc354..5fdedfce 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -5,7 +5,7 @@ use cb_common::{ pbs::{ error::{PbsError, ValidationError}, RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, HEADER_SLOT_UUID_KEY, - HEADER_START_TIME_UNIX_MS, MAX_SIZE, + HEADER_START_TIME_UNIX_MS, }, utils::{get_user_agent_with_version, utcnow_ms}, }; @@ -14,9 +14,10 @@ use reqwest::header::USER_AGENT; use tracing::{debug, warn}; use crate::{ - constants::{SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, + constants::{MAX_SIZE_SUBMIT_BLOCK, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, + utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock @@ -94,18 +95,14 @@ async fn send_submit_block( .with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .inc(); - let response_bytes = res.bytes().await?; - - if response_bytes.len() > MAX_SIZE { - return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() }); - } + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK).await?; if !code.is_success() { let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), code: code.as_u16(), }; - // we request payload to all relays, but some may have not received it + // we requested the payload from all relays, but some may have not received it warn!(%err, "failed to get payload (this might be ok if other relays have it)"); return Err(err); }; diff --git a/crates/pbs/src/utils.rs b/crates/pbs/src/utils.rs new file mode 100644 index 00000000..a7b99f00 --- /dev/null +++ b/crates/pbs/src/utils.rs @@ -0,0 +1,27 @@ +use cb_common::pbs::error::PbsError; +use futures::StreamExt; +use reqwest::Response; + +pub async fn read_chunked_body_with_max( + res: Response, + max_size: usize, +) -> Result, PbsError> { + let mut stream = res.bytes_stream(); + let mut response_bytes = Vec::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + if response_bytes.len() + chunk.len() > max_size { + // avoid spamming logs if the message is too large + response_bytes.truncate(1024); + return Err(PbsError::PayloadTooLarge { + max: max_size, + raw: String::from_utf8_lossy(&response_bytes).into_owned(), + }); + } + + response_bytes.extend_from_slice(&chunk); + } + + Ok(response_bytes) +} diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index e9e42ca3..80dd8f15 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -24,6 +24,7 @@ use cb_common::{ types::Chain, utils::blst_pubkey_to_alloy, }; +use cb_pbs::MAX_SIZE_SUBMIT_BLOCK; use tokio::net::TcpListener; use tracing::debug; use tree_hash::TreeHash; @@ -41,6 +42,7 @@ pub async fn start_mock_relay_service(state: Arc, port: u16) -> pub struct MockRelayState { pub chain: Chain, pub signer: BlsSecretKey, + large_body: bool, received_get_header: Arc, received_get_status: Arc, received_register_validator: Arc, @@ -67,12 +69,17 @@ impl MockRelayState { Self { chain, signer, + large_body: false, received_get_header: Default::default(), received_get_status: Default::default(), received_register_validator: Default::default(), received_submit_block: Default::default(), } } + + pub fn with_large_body(self) -> Self { + Self { large_body: true, ..self } + } } pub fn mock_relay_app_router(state: Arc) -> Router { @@ -119,6 +126,12 @@ async fn handle_register_validator( async fn handle_submit_block(State(state): State>) -> impl IntoResponse { state.received_submit_block.fetch_add(1, Ordering::Relaxed); - let response = SubmitBlindedBlockResponse::default(); + + let response = if state.large_body { + vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK] + } else { + serde_json::to_vec(&SubmitBlindedBlockResponse::default()).unwrap() + }; + (StatusCode::OK, Json(response)).into_response() } diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index e5e7ffaa..44301235 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -37,14 +37,7 @@ impl MockValidator { let registration: Vec = vec![]; - self.comm_boost - .client - .post(url) - .header("Content-Type", "application/json") - .body(serde_json::to_string(®istration).unwrap()) - .send() - .await? - .error_for_status()?; + self.comm_boost.client.post(url).json(®istration).send().await?.error_for_status()?; Ok(()) } @@ -57,8 +50,7 @@ impl MockValidator { self.comm_boost .client .post(url) - .header("Content-Type", "application/json") - .body(serde_json::to_string(&signed_blinded_block).unwrap()) + .json(&signed_blinded_block) .send() .await? .error_for_status()?; diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 0ee7f526..5fdb425c 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -14,7 +14,7 @@ pub fn get_local_address(port: u16) -> String { static SYNC_SETUP: Once = Once::new(); pub fn setup_test_env() { SYNC_SETUP.call_once(|| { - tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); + tracing_subscriber::fmt().with_max_level(tracing::Level::TRACE).init(); }); } diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index d6a3af98..005933fa 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -155,7 +155,36 @@ async fn test_submit_block() -> Result<()> { info!("Sending submit block"); let res = mock_validator.do_submit_block().await; - assert!(res.is_ok()); + assert!(res.is_err()); + assert_eq!(mock_state.received_submit_block(), 1); + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_too_large() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let port = 3500; + + let relays = vec![generate_mock_relay(port + 1, *pubkey)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer).with_large_body()); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); + + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(port)?; + info!("Sending submit block"); + let res = mock_validator.do_submit_block().await; + + assert!(res.is_err()); assert_eq!(mock_state.received_submit_block(), 1); Ok(()) }