From b54b0e73286e3d3fc60dae231e822979e6d78df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20I=C3=B1aki=20Bilbao?= Date: Thu, 9 Jan 2025 16:45:51 -0300 Subject: [PATCH 1/5] Add successful num_registrations to logs --- crates/pbs/src/api.rs | 2 +- .../pbs/src/mev_boost/register_validator.rs | 10 +++--- crates/pbs/src/routes/register_validator.rs | 31 ++++++++++++------- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/crates/pbs/src/api.rs b/crates/pbs/src/api.rs index 217e438a..969e835c 100644 --- a/crates/pbs/src/api.rs +++ b/crates/pbs/src/api.rs @@ -45,7 +45,7 @@ pub trait BuilderApi: 'static { registrations: Vec, req_headers: HeaderMap, state: PbsState, - ) -> eyre::Result<()> { + ) -> eyre::Result { mev_boost::register_validator(registrations, req_headers, state).await } } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 9f995a86..98e7c8a8 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -25,7 +25,7 @@ pub async fn register_validator( registrations: Vec, req_headers: HeaderMap, state: PbsState, -) -> eyre::Result<()> { +) -> eyre::Result { // prepare headers let mut send_headers = HeaderMap::new(); send_headers @@ -49,8 +49,10 @@ pub async fn register_validator( if state.pbs_config().wait_all_registrations { // wait for all relays registrations to complete let results = join_all(handles).await; - if results.into_iter().any(|res| res.is_ok_and(|res| res.is_ok())) { - Ok(()) + let successful = results.iter().flatten().filter(|res| res.is_ok()).count(); + + if successful > 0 { + Ok(successful) } else { bail!("No relay passed register_validator successfully") } @@ -58,7 +60,7 @@ pub async fn register_validator( // return once first completes, others proceed in background let result = select_ok(handles).await?; match result.0 { - Ok(_) => Ok(()), + Ok(_) => Ok(1), Err(_) => bail!("No relay passed register_validator successfully"), } } diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index fd73837b..bfdd94f1 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -39,20 +39,27 @@ pub async fn handle_register_validator>( } } - if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { - state.publish_event(BuilderEvent::RegisterValidatorResponse); - error!(%err, "all relays failed registration"); + match A::register_validator(registrations, req_headers, state.clone()).await { + Ok(num_successful) => { + if state.pbs_config().wait_all_registrations { + info!(num_registrations = num_successful, "register validator successful"); + } else { + info!("register validator successful in at least one relay"); + } - let err = PbsClientError::NoResponse; - BEACON_NODE_STATUS - .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) - .inc(); - Err(err) - } else { - info!("register validator successful"); + BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); + Ok(StatusCode::OK) + } + Err(err) => { + state.publish_event(BuilderEvent::RegisterValidatorResponse); + error!(%err, "all relays failed registration"); - BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); - Ok(StatusCode::OK) + let err = PbsClientError::NoResponse; + BEACON_NODE_STATUS + .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) + .inc(); + Err(err) + } } } From 43db9c3b8aa73b117ec20c2e1c1a6d82ffc32f6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20I=C3=B1aki=20Bilbao?= Date: Thu, 16 Jan 2025 12:30:39 -0300 Subject: [PATCH 2/5] Log when all relays respond --- crates/pbs/src/api.rs | 2 +- .../pbs/src/mev_boost/register_validator.rs | 36 ++++++++++++++----- crates/pbs/src/routes/register_validator.rs | 31 +++++++--------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/crates/pbs/src/api.rs b/crates/pbs/src/api.rs index 969e835c..217e438a 100644 --- a/crates/pbs/src/api.rs +++ b/crates/pbs/src/api.rs @@ -45,7 +45,7 @@ pub trait BuilderApi: 'static { registrations: Vec, req_headers: HeaderMap, state: PbsState, - ) -> eyre::Result { + ) -> eyre::Result<()> { mev_boost::register_validator(registrations, req_headers, state).await } } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 5db375c3..7a40901e 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -7,9 +7,9 @@ use cb_common::{ utils::{get_user_agent_with_version, utcnow_ms}, }; use eyre::bail; -use futures::future::{join_all, select_ok}; +use futures::future::{join_all, select_all}; use reqwest::header::USER_AGENT; -use tracing::{debug, error, Instrument}; +use tracing::{debug, error, info, Instrument}; use url::Url; use crate::{ @@ -25,7 +25,7 @@ pub async fn register_validator( registrations: Vec, req_headers: HeaderMap, state: PbsState, -) -> eyre::Result { +) -> eyre::Result<()> { // prepare headers let mut send_headers = HeaderMap::new(); send_headers @@ -50,18 +50,36 @@ pub async fn register_validator( // wait for all relays registrations to complete let results = join_all(handles).await; let successful = results.iter().flatten().filter(|res| res.is_ok()).count(); - if successful > 0 { - Ok(successful) + info!(num_registrations = successful, "all relay registrations finished"); + Ok(()) } else { bail!("No relay passed register_validator successfully") } } else { // return once first completes, others proceed in background - let result = select_ok(handles).await?; - match result.0 { - Ok(_) => Ok(1), - Err(_) => bail!("No relay passed register_validator successfully"), + let mut one_success = false; + while !one_success && !handles.is_empty() { + let (result, _, remaining) = select_all(handles).await; + + one_success = result.is_ok_and(|res| res.is_ok()); + handles = remaining; + } + + if one_success { + // wait for the rest in background and log results + tokio::spawn( + async move { + let results = join_all(handles).await; + // successful + 1 since we had one success above + let successful = 1 + results.iter().flatten().filter(|res| res.is_ok()).count(); + info!(num_registrations = successful, "all relay registrations finished"); + } + .in_current_span(), + ); + Ok(()) + } else { + bail!("No relay passed register_validator successfully") } } } diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index bfdd94f1..fd73837b 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -39,27 +39,20 @@ pub async fn handle_register_validator>( } } - match A::register_validator(registrations, req_headers, state.clone()).await { - Ok(num_successful) => { - if state.pbs_config().wait_all_registrations { - info!(num_registrations = num_successful, "register validator successful"); - } else { - info!("register validator successful in at least one relay"); - } + if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { + state.publish_event(BuilderEvent::RegisterValidatorResponse); + error!(%err, "all relays failed registration"); - BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); - Ok(StatusCode::OK) - } - Err(err) => { - state.publish_event(BuilderEvent::RegisterValidatorResponse); - error!(%err, "all relays failed registration"); + let err = PbsClientError::NoResponse; + BEACON_NODE_STATUS + .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) + .inc(); + Err(err) + } else { + info!("register validator successful"); - let err = PbsClientError::NoResponse; - BEACON_NODE_STATUS - .with_label_values(&[err.status_code().as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG]) - .inc(); - Err(err) - } + BEACON_NODE_STATUS.with_label_values(&["200", REGISTER_VALIDATOR_ENDPOINT_TAG]).inc(); + Ok(StatusCode::OK) } } From 5963349af40e3bde24d3951573b2f4179fcdf486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20I=C3=B1aki=20Bilbao?= Date: Fri, 17 Jan 2025 11:24:51 -0300 Subject: [PATCH 3/5] Add number of validators registered among number of relays --- .../pbs/src/mev_boost/register_validator.rs | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 7a40901e..8b0094c8 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,4 +1,7 @@ -use std::time::{Duration, Instant}; +use std::{ + collections::HashSet, + time::{Duration, Instant}, +}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; @@ -32,8 +35,16 @@ pub async fn register_validator( .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); + let num_validators = registrations + .iter() + .map(|registration| registration.message.pubkey) + .collect::>() + .len(); + let relays = state.all_relays().to_vec(); let mut handles = Vec::with_capacity(relays.len()); + let start_register = Instant::now(); + for relay in relays { handles.push(tokio::spawn( send_register_validator_with_timeout( @@ -49,9 +60,16 @@ pub async fn register_validator( if state.pbs_config().wait_all_registrations { // wait for all relays registrations to complete let results = join_all(handles).await; - let successful = results.iter().flatten().filter(|res| res.is_ok()).count(); - if successful > 0 { - info!(num_registrations = successful, "all relay registrations finished"); + let total_latency = start_register.elapsed(); + + let successful_responses = results.iter().flatten().filter(|res| res.is_ok()).count(); + if successful_responses > 0 { + info!( + num_responses = successful_responses, + num_registrations = num_validators, + total_latency = ?total_latency, + "all relay registrations finished" + ); Ok(()) } else { bail!("No relay passed register_validator successfully") @@ -71,9 +89,17 @@ pub async fn register_validator( tokio::spawn( async move { let results = join_all(handles).await; + let total_latency = start_register.elapsed(); + // successful + 1 since we had one success above - let successful = 1 + results.iter().flatten().filter(|res| res.is_ok()).count(); - info!(num_registrations = successful, "all relay registrations finished"); + let successful_responses = + 1 + results.iter().flatten().filter(|res| res.is_ok()).count(); + info!( + num_relays = successful_responses, + num_registrations = num_validators, + total_latency = ?total_latency, + "all relay registrations finished" + ); } .in_current_span(), ); From 03e436661af78576ff7bccdf0c7519ca1cb60c30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20I=C3=B1aki=20Bilbao?= Date: Fri, 17 Jan 2025 12:32:17 -0300 Subject: [PATCH 4/5] Change log names --- crates/pbs/src/mev_boost/register_validator.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 8b0094c8..246750f9 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -60,14 +60,14 @@ pub async fn register_validator( if state.pbs_config().wait_all_registrations { // wait for all relays registrations to complete let results = join_all(handles).await; - let total_latency = start_register.elapsed(); + let total_time = start_register.elapsed(); let successful_responses = results.iter().flatten().filter(|res| res.is_ok()).count(); if successful_responses > 0 { info!( - num_responses = successful_responses, + num_relays = successful_responses, num_registrations = num_validators, - total_latency = ?total_latency, + total_time = ?total_time, "all relay registrations finished" ); Ok(()) @@ -89,7 +89,7 @@ pub async fn register_validator( tokio::spawn( async move { let results = join_all(handles).await; - let total_latency = start_register.elapsed(); + let total_time = start_register.elapsed(); // successful + 1 since we had one success above let successful_responses = @@ -97,7 +97,7 @@ pub async fn register_validator( info!( num_relays = successful_responses, num_registrations = num_validators, - total_latency = ?total_latency, + total_time = ?total_time, "all relay registrations finished" ); } From 55e5db816f8f21d3bd874567da493a6c2996fd90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20I=C3=B1aki=20Bilbao?= Date: Wed, 22 Jan 2025 18:03:39 -0300 Subject: [PATCH 5/5] Refactor --- .../pbs/src/mev_boost/register_validator.rs | 69 ++++--------------- 1 file changed, 14 insertions(+), 55 deletions(-) diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 4edc1480..5b0ac097 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashSet, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; @@ -10,9 +7,9 @@ use cb_common::{ utils::{get_user_agent_with_version, utcnow_ms}, }; use eyre::bail; -use futures::future::{join_all, select_all}; +use futures::future::{join_all, select_ok}; use reqwest::header::USER_AGENT; -use tracing::{debug, error, info, Instrument}; +use tracing::{debug, error, Instrument}; use url::Url; use crate::{ @@ -35,16 +32,8 @@ pub async fn register_validator( .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let num_validators = registrations - .iter() - .map(|registration| registration.message.pubkey) - .collect::>() - .len(); - let relays = state.all_relays().to_vec(); let mut handles = Vec::with_capacity(relays.len()); - let start_register = Instant::now(); - for relay in relays.clone() { if let Some(batch_size) = relay.config.validator_registration_batch_size { for batch in registrations.chunks(batch_size) { @@ -74,52 +63,17 @@ pub async fn register_validator( if state.pbs_config().wait_all_registrations { // wait for all relays registrations to complete let results = join_all(handles).await; - let total_time = start_register.elapsed(); - - let successful_responses = results.iter().flatten().filter(|res| res.is_ok()).count(); - if successful_responses > 0 { - info!( - num_relays = successful_responses, - num_registrations = num_validators, - total_time = ?total_time, - "all relay registrations finished" - ); + if results.into_iter().any(|res| res.is_ok_and(|res| res.is_ok())) { Ok(()) } else { bail!("No relay passed register_validator successfully") } } else { // return once first completes, others proceed in background - let mut one_success = false; - while !one_success && !handles.is_empty() { - let (result, _, remaining) = select_all(handles).await; - - one_success = result.is_ok_and(|res| res.is_ok()); - handles = remaining; - } - - if one_success { - // wait for the rest in background and log results - tokio::spawn( - async move { - let results = join_all(handles).await; - let total_time = start_register.elapsed(); - - // successful + 1 since we had one success above - let successful_responses = - 1 + results.iter().flatten().filter(|res| res.is_ok()).count(); - info!( - num_relays = successful_responses, - num_registrations = num_validators, - total_time = ?total_time, - "all relay registrations finished" - ); - } - .in_current_span(), - ); - Ok(()) - } else { - bail!("No relay passed register_validator successfully") + let result = select_ok(handles).await?; + match result.0 { + Ok(_) => Ok(()), + Err(_) => bail!("No relay passed register_validator successfully"), } } } @@ -223,7 +177,12 @@ async fn send_register_validator( return Err(err); }; - debug!(?code, latency = ?request_latency, "registration successful"); + debug!( + ?code, + latency = ?request_latency, + num_registrations = registrations.len(), + "registration successful" + ); Ok(()) }