diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 1922552f..5e42b6c6 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -171,8 +171,12 @@ pub struct PbsModuleConfig { pub endpoint: SocketAddr, /// Pbs default config pub pbs_config: Arc, - /// List of relays + /// List of default relays pub relays: Vec, + /// List of all default relays plus additional relays from muxes (based on + /// URL) DO NOT use this for get_header calls, use `relays` or `muxes` + /// instead + pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, /// Event publisher @@ -208,12 +212,32 @@ pub async fn load_pbs_config() -> Result { let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; + let mut all_relays = HashMap::with_capacity(relay_clients.len()); + + if let Some(muxes) = &muxes { + for (_, mux) in muxes.iter() { + for relay in mux.relays.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + } + } + + // insert default relays after to make sure we keep these as defaults, + // this means we override timing games which is ok since this won't be used for + // get_header we also override headers if the same relays has two + // definitions (in muxes and default) + for relay in relay_clients.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + + let all_relays = all_relays.into_values().collect(); Ok(PbsModuleConfig { chain: config.chain, endpoint, pbs_config: Arc::new(config.pbs.pbs_config), relays: relay_clients, + all_relays, signer_client: None, event_publisher: maybe_publiher, muxes, @@ -264,6 +288,25 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; + let mut all_relays = HashMap::with_capacity(relay_clients.len()); + + if let Some(muxes) = &muxes { + for (_, mux) in muxes.iter() { + for relay in mux.relays.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + } + } + + // insert default relays after to make sure we keep these as defaults, + // this also means we override timing games which is ok since this won't be used + // for get header we also override headers if the same relays has two + // definitions (in muxes and default) + for relay in relay_clients.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + + let all_relays = all_relays.into_values().collect(); let signer_client = if cb_config.pbs.static_config.with_signer { // if custom pbs requires a signer client, load jwt @@ -280,6 +323,7 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC endpoint, pbs_config: Arc::new(cb_config.pbs.static_config.pbs_config), relays: relay_clients, + all_relays, signer_client, event_publisher: maybe_publiher, muxes, diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index db18466a..c74a6516 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -50,7 +50,8 @@ impl<'de> Deserialize<'de> for RelayEntry { } } -/// A client to interact with a relay, safe to share across threads +/// A client to interact with a relay, safe to share across threads and cheaply +/// cloneable #[derive(Debug, Clone)] pub struct RelayClient { /// ID of the relay diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 9f995a86..6a745319 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -32,7 +32,7 @@ pub async fn register_validator( .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays().to_vec(); + let relays = state.all_relays().to_vec(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(tokio::spawn( diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 5fdf7db5..7d9d67d2 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -31,7 +31,7 @@ pub async fn get_status( let mut send_headers = HeaderMap::new(); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays(); + let relays = state.all_relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(Box::pin(send_relay_check(relay, send_headers.clone()))); diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 751bf2e4..0d03816b 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -32,7 +32,7 @@ pub async fn submit_block( send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays(); + let relays = state.all_relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push(Box::pin(submit_block_with_timeout( diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 9eb06eb6..6b3f15c9 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -43,9 +43,13 @@ where &self.config.pbs_config } - pub fn relays(&self) -> &[RelayClient] { - &self.config.relays + /// Returns all the relays (including those in muxes) + /// DO NOT use this through the PBS module, use + /// [`PbsState::mux_config_and_relays`] instead + pub fn all_relays(&self) -> &[RelayClient] { + &self.config.all_relays } + /// Returns the PBS config and relay clients for the given validator pubkey. /// If the pubkey is not found in any mux, the default configs are /// returned @@ -55,7 +59,8 @@ where ) -> (&PbsConfig, &[RelayClient], Option<&str>) { match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)), - None => (self.pbs_config(), self.relays(), None), + // return only the default relays if there's no match + None => (self.pbs_config(), &self.config.relays, None), } } diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 88dea73c..cc8f5d27 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -48,6 +48,7 @@ fn to_pbs_config(chain: Chain, pbs_config: PbsConfig, relays: Vec) pbs_config: Arc::new(pbs_config), signer_client: None, event_publisher: None, + all_relays: relays.clone(), relays, muxes: None, } @@ -204,24 +205,28 @@ async fn test_submit_block_too_large() -> Result<()> { async fn test_mux() -> Result<()> { setup_test_env(); let signer = random_secret(); - let pubkey_1: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - let signer_2 = random_secret(); - let pubkey_2: BlsPublicKey = blst_pubkey_to_alloy(&signer_2.sk_to_pk()).into(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); let chain = Chain::Holesky; let port = 3600; - let mux_relay = generate_mock_relay(port + 1, *pubkey_1)?; - let relays = vec![mux_relay.clone(), generate_mock_relay(port + 2, *pubkey_2)?]; + let mux_relay_1 = generate_mock_relay(port + 1, *pubkey)?; + let mux_relay_2 = generate_mock_relay(port + 2, *pubkey)?; + let default_relay = generate_mock_relay(port + 3, *pubkey)?; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 3)); + let relays = vec![default_relay.clone()]; let mut config = to_pbs_config(chain, get_pbs_static_config(port), relays); + config.all_relays = vec![mux_relay_1.clone(), mux_relay_2.clone(), default_relay.clone()]; + let mux = RuntimeMuxConfig { id: String::from("test"), config: config.pbs_config.clone(), - relays: vec![mux_relay], + relays: vec![mux_relay_1, mux_relay_2], }; let validator_pubkey = blst_pubkey_to_alloy(&random_secret().sk_to_pk()); @@ -239,12 +244,28 @@ async fn test_mux() -> Result<()> { let res = mock_validator.do_get_header(None).await; assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 2); // both relays were used + assert_eq!(mock_state.received_get_header(), 1); // only default relay was used info!("Sending get header with mux"); let res = mock_validator.do_get_header(Some(validator_pubkey)).await; assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 3); // only one relay was used + assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used + + let res = mock_validator.do_get_status().await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_get_status(), 3); // default + 2 mux relays were used + + let res = mock_validator.do_register_validator().await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_register_validator(), 3); // default + 2 mux relays were used + + let res = mock_validator.do_submit_block().await; + + assert!(res.is_err()); + assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used + Ok(()) }