Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions bin/default_pbs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use cb_common::{
config::load_pbs_config, module_names::PBS_MODULE_NAME, utils::initialize_tracing_log,
};
use cb_common::{config::load_pbs_config, utils::initialize_pbs_tracing_log};
use cb_pbs::{DefaultBuilderApi, PbsService, PbsState};
use eyre::Result;

Expand All @@ -13,10 +11,9 @@ async fn main() -> Result<()> {
std::env::set_var("RUST_BACKTRACE", "1");
}

// TODO: handle errors
let pbs_config = load_pbs_config().expect("failed to load pbs config");
let _guard = initialize_tracing_log(PBS_MODULE_NAME);
let state = PbsState::<()>::new(pbs_config);
let pbs_config = load_pbs_config()?;
let _guard = initialize_pbs_tracing_log();
let state = PbsState::new(pbs_config);
PbsService::init_metrics()?;
PbsService::run::<(), DefaultBuilderApi>(state).await
PbsService::run::<_, DefaultBuilderApi>(state).await
}
3 changes: 2 additions & 1 deletion bin/signer_module.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cb_common::{
config::StartSignerConfig, module_names::SIGNER_MODULE_NAME, utils::initialize_tracing_log,
config::{StartSignerConfig, SIGNER_MODULE_NAME},
utils::initialize_tracing_log,
};
use cb_signer::service::SigningService;
use eyre::Result;
Expand Down
12 changes: 9 additions & 3 deletions bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ pub mod prelude {
pub use cb_common::{
commit,
commit::request::{SignRequest, SignedProxyDelegation},
config::{load_builder_module_config, load_commit_module_config, StartCommitModuleConfig},
config::{
load_builder_module_config, load_commit_module_config, load_pbs_config,
load_pbs_custom_config, StartCommitModuleConfig,
},
pbs::{BuilderEvent, BuilderEventClient, OnBuilderApiEvent},
utils::{initialize_tracing_log, utcnow_ms, utcnow_ns, utcnow_sec, utcnow_us},
utils::{
initialize_pbs_tracing_log, initialize_tracing_log, utcnow_ms, utcnow_ns, utcnow_sec,
utcnow_us,
},
};
pub use cb_metrics::provider::MetricsProvider;
pub use cb_pbs::{
get_header, get_status, register_validator, submit_block, BuilderApi, BuilderApiState,
PbsState,
DefaultBuilderApi, PbsService, PbsState,
};
// The TreeHash derive macro requires tree_hash:: as import
pub mod tree_hash {
Expand Down
4 changes: 4 additions & 0 deletions crates/common/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ pub const JWTS_ENV: &str = "CB_JWTS";
// TODO: replace these with an actual image in the registry
pub const PBS_DEFAULT_IMAGE: &str = "commitboost_pbs_default";
pub const SIGNER_IMAGE: &str = "commitboost_signer";

// Module names
pub const PBS_MODULE_NAME: &str = "pbs";
pub const SIGNER_MODULE_NAME: &str = "signer";
31 changes: 14 additions & 17 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ pub struct StaticPbsConfig {
pub with_signer: bool,
}

/// Runtime config for the pbs module with support for custom extra config
/// This will be shared across threads, so the `extra` should be thread safe,
/// e.g. wrapped in an Arc
/// Runtime config for the pbs module
#[derive(Debug, Clone)]
pub struct PbsModuleConfig<T = ()> {
pub struct PbsModuleConfig {
/// Chain spec
pub chain: Chain,
/// Pbs default config
Expand All @@ -89,16 +87,14 @@ pub struct PbsModuleConfig<T = ()> {
pub signer_client: Option<SignerClient>,
/// Event publisher
pub event_publiher: Option<BuilderEventPublisher>,
/// Opaque module config
pub extra: T,
}

fn default_pbs() -> String {
PBS_DEFAULT_IMAGE.to_string()
}

/// Loads the default pbs config, i.e. with no signer client or custom data
pub fn load_pbs_config() -> Result<PbsModuleConfig<()>> {
pub fn load_pbs_config() -> Result<PbsModuleConfig> {
let config = CommitBoostConfig::from_env_path()?;
let relay_clients =
config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
Expand All @@ -110,12 +106,11 @@ pub fn load_pbs_config() -> Result<PbsModuleConfig<()>> {
relays: relay_clients,
signer_client: None,
event_publiher: maybe_publiher,
extra: (),
})
}

/// Loads a custom pbs config, i.e. with signer client and/or custom data
pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<PbsModuleConfig<T>> {
pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig, T)> {
#[derive(Debug, Deserialize)]
struct CustomPbsConfig<U> {
#[serde(flatten)]
Expand Down Expand Up @@ -146,12 +141,14 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<PbsModuleConfig<T
None
};

Ok(PbsModuleConfig {
chain: cb_config.chain,
pbs_config: Arc::new(cb_config.pbs.static_config.pbs_config),
relays: relay_clients,
signer_client,
event_publiher: maybe_publiher,
extra: cb_config.pbs.extra,
})
Ok((
PbsModuleConfig {
chain: cb_config.chain,
pbs_config: Arc::new(cb_config.pbs.static_config.pbs_config),
relays: relay_clients,
signer_client,
event_publiher: maybe_publiher,
},
cb_config.pbs.extra,
))
}
1 change: 0 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod config;
pub mod constants;
pub mod error;
pub mod loader;
pub mod module_names;
pub mod pbs;
pub mod signature;
pub mod signer;
Expand Down
2 changes: 0 additions & 2 deletions crates/common/src/module_names.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/common/src/pbs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub const SUBMIT_BLOCK_PATH: &str = "/blinded_blocks";

pub const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
pub const HEADER_VERSION_KEY: &str = "X-CommitBoost-Version";
pub const HEAVER_VERSION_VALUE: &str = env!("CARGO_PKG_VERSION");
pub const HEADER_VERSION_VALUE: &str = env!("CARGO_PKG_VERSION");
pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS";

pub const BUILDER_EVENTS_PATH: &str = "/builder_events";
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/pbs/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use url::Url;

use super::{
constants::{BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH},
HEADER_VERSION_KEY, HEAVER_VERSION_VALUE,
HEADER_VERSION_KEY, HEADER_VERSION_VALUE,
};
use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT};
/// A parsed entry of the relay url in the format: scheme://pubkey@host
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct RelayClient {
impl RelayClient {
pub fn new(config: RelayConfig) -> Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEAVER_VERSION_VALUE));
headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEADER_VERSION_VALUE));

if let Some(custom_headers) = &config.headers {
for (key, value) in custom_headers {
Expand Down
10 changes: 7 additions & 3 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation};
use tracing_subscriber::{fmt::Layer, prelude::*, EnvFilter};

use crate::{
config::{default_log_level, RollingDuration, CB_BASE_LOG_PATH},
pbs::HEAVER_VERSION_VALUE,
config::{default_log_level, RollingDuration, CB_BASE_LOG_PATH, PBS_MODULE_NAME},
pbs::HEADER_VERSION_VALUE,
types::Chain,
};

Expand Down Expand Up @@ -182,6 +182,10 @@ pub fn initialize_tracing_log(module_id: &str) -> WorkerGuard {
guard
}

pub fn initialize_pbs_tracing_log() -> WorkerGuard {
initialize_tracing_log(PBS_MODULE_NAME)
}

// all commit boost crates
// TODO: this can probably done without unwrap
fn format_crates_filter(default_level: &str, crates_level: &str) -> EnvFilter {
Expand Down Expand Up @@ -233,5 +237,5 @@ pub fn get_user_agent(req_headers: &HeaderMap) -> String {
/// Adds the commit boost version to the existing user agent
pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<HeaderValue> {
let ua = get_user_agent(req_headers);
Ok(HeaderValue::from_str(&format!("commit-boost/{HEAVER_VERSION_VALUE} {}", ua))?)
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {}", ua))?)
}
4 changes: 2 additions & 2 deletions crates/pbs/src/routes/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
};

#[tracing::instrument(skip_all, name = "get_header", fields(req_id = %Uuid::new_v4(), slot = params.slot))]
pub async fn handle_get_header<S: BuilderApiState, T: BuilderApi<S>>(
pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsState<S>>,
req_headers: HeaderMap,
Path(params): Path<GetHeaderParams>,
Expand All @@ -34,7 +34,7 @@ pub async fn handle_get_header<S: BuilderApiState, T: BuilderApi<S>>(

info!(ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot);

match T::get_header(params, req_headers, state.clone()).await {
match A::get_header(params, req_headers, state.clone()).await {
Ok(res) => {
state.publish_event(BuilderEvent::GetHeaderResponse(Box::new(res.clone())));

Expand Down
4 changes: 2 additions & 2 deletions crates/pbs/src/routes/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};

#[tracing::instrument(skip_all, name = "register_validators", fields(req_id = %Uuid::new_v4()))]
pub async fn handle_register_validator<S: BuilderApiState, T: BuilderApi<S>>(
pub async fn handle_register_validator<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsState<S>>,
req_headers: HeaderMap,
Json(registrations): Json<Vec<ValidatorRegistration>>,
Expand All @@ -26,7 +26,7 @@ pub async fn handle_register_validator<S: BuilderApiState, T: BuilderApi<S>>(

info!(ua, num_registrations = registrations.len());

if let Err(err) = T::register_validator(registrations, req_headers, state.clone()).await {
if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await {
state.publish_event(BuilderEvent::RegisterValidatorResponse);
error!(?err, "all relays failed registration");

Expand Down
12 changes: 6 additions & 6 deletions crates/pbs/src/routes/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ use crate::{
state::{BuilderApiState, PbsState},
};

pub fn create_app_router<S: BuilderApiState, T: BuilderApi<S>>(state: PbsState<S>) -> Router {
pub fn create_app_router<S: BuilderApiState, A: BuilderApi<S>>(state: PbsState<S>) -> Router {
let builder_routes = Router::new()
.route(GET_HEADER_PATH, get(handle_get_header::<S, T>))
.route(GET_STATUS_PATH, get(handle_get_status::<S, T>))
.route(REGISTER_VALIDATOR_PATH, post(handle_register_validator::<S, T>))
.route(SUBMIT_BLOCK_PATH, post(handle_submit_block::<S, T>));
.route(GET_HEADER_PATH, get(handle_get_header::<S, A>))
.route(GET_STATUS_PATH, get(handle_get_status::<S, A>))
.route(REGISTER_VALIDATOR_PATH, post(handle_register_validator::<S, A>))
.route(SUBMIT_BLOCK_PATH, post(handle_submit_block::<S, A>));

let builder_api = Router::new().nest(BULDER_API_PATH, builder_routes);

let app = if let Some(extra_routes) = T::extra_routes() {
let app = if let Some(extra_routes) = A::extra_routes() {
builder_api.merge(extra_routes)
} else {
builder_api
Expand Down
4 changes: 2 additions & 2 deletions crates/pbs/src/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};

#[tracing::instrument(skip_all, name = "status", fields(req_id = %Uuid::new_v4()))]
pub async fn handle_get_status<S: BuilderApiState, T: BuilderApi<S>>(
pub async fn handle_get_status<S: BuilderApiState, A: BuilderApi<S>>(
req_headers: HeaderMap,
State(state): State<PbsState<S>>,
) -> Result<impl IntoResponse, PbsClientError> {
Expand All @@ -23,7 +23,7 @@ pub async fn handle_get_status<S: BuilderApiState, T: BuilderApi<S>>(

info!(ua, relay_check = state.config.pbs_config.relay_check);

match T::get_status(req_headers, state.clone()).await {
match A::get_status(req_headers, state.clone()).await {
Ok(_) => {
state.publish_event(BuilderEvent::GetStatusResponse);
info!("relay check successful");
Expand Down
4 changes: 2 additions & 2 deletions crates/pbs/src/routes/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
};

#[tracing::instrument(skip_all, name = "submit_blinded_block", fields(req_id = %Uuid::new_v4(), slot = signed_blinded_block.message.slot))]
pub async fn handle_submit_block<S: BuilderApiState, T: BuilderApi<S>>(
pub async fn handle_submit_block<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsState<S>>,
req_headers: HeaderMap,
Json(signed_blinded_block): Json<SignedBlindedBeaconBlock>,
Expand All @@ -37,7 +37,7 @@ pub async fn handle_submit_block<S: BuilderApiState, T: BuilderApi<S>>(
warn!(expected = curr_slot, got = slot, "blinded beacon slot mismatch")
}

match T::submit_block(signed_blinded_block, req_headers, state.clone()).await {
match A::submit_block(signed_blinded_block, req_headers, state.clone()).await {
Ok(res) => {
trace!(?res);
state.publish_event(BuilderEvent::SubmitBlockResponse(Box::new(res.clone())));
Expand Down
36 changes: 2 additions & 34 deletions crates/pbs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ pub struct PbsService;
// TODO: add ServerMaxHeaderBytes

impl PbsService {
pub async fn run<S: BuilderApiState, T: BuilderApi<S>>(state: PbsState<S>) -> Result<()> {
// if state.pbs_config().relay_check {
// PbsService::relay_check(state.relays()).await;
// }

pub async fn run<S: BuilderApiState, A: BuilderApi<S>>(state: PbsState<S>) -> Result<()> {
let address = SocketAddr::from(([0, 0, 0, 0], state.config.pbs_config.port));
let events_subs =
state.config.event_publiher.as_ref().map(|e| e.n_subscribers()).unwrap_or_default();
let app = create_app_router::<S, T>(state);
let app = create_app_router::<S, A>(state);

info!(?address, events_subs, "Starting PBS service");

Expand All @@ -42,32 +38,4 @@ impl PbsService {
pub fn init_metrics() -> Result<()> {
MetricsProvider::load_and_run(PBS_METRICS_REGISTRY.clone())
}

// TODO: before starting, send a sanity check to relay
// pub async fn relay_check(relays: &[RelayEntry]) {
// info!("Sending initial relay checks");

// let mut handles = Vec::with_capacity(relays.len());

// for relay in relays {
// handles.push(Box::pin(send_relay_check(relay.clone())))
// }

// let results = join_all(handles).await;

// if !results.iter().any(|r| r.is_ok()) {
// error!("No relay passed check successfully");
// return;
// }

// for (i, res) in results.into_iter().enumerate() {
// let relay_id = relays[i].id.as_str();

// if let Err(err) = res {
// error!(?err, "Failed to get status from {relay_id}");
// } else {
// info!(relay_id, "Initial check successful")
// }
// }
// }
}
17 changes: 8 additions & 9 deletions crates/pbs/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use uuid::Uuid;
pub trait BuilderApiState: Clone + Sync + Send + 'static {}
impl BuilderApiState for () {}

/// State for the Pbs module. It can be extended in two ways:
/// - By adding extra configs to be loaded at startup
/// - By adding extra data to the state
/// State for the Pbs module. It can be extended by adding extra data to the
/// state
#[derive(Clone)]
pub struct PbsState<U, S: BuilderApiState = ()> {
pub struct PbsState<S: BuilderApiState = ()> {
/// Config data for the Pbs service
pub config: PbsModuleConfig<U>,
pub config: PbsModuleConfig,
/// Opaque extra data for library use
pub data: S,
/// Info about the latest slot and its uuid
Expand All @@ -29,8 +28,8 @@ pub struct PbsState<U, S: BuilderApiState = ()> {
bid_cache: Arc<DashMap<u64, Vec<GetHeaderReponse>>>,
}

impl<U> PbsState<U, ()> {
pub fn new(config: PbsModuleConfig<U>) -> Self {
impl PbsState<()> {
pub fn new(config: PbsModuleConfig) -> Self {
Self {
config,
data: (),
Expand All @@ -39,7 +38,7 @@ impl<U> PbsState<U, ()> {
}
}

pub fn with_data<S: BuilderApiState>(self, data: S) -> PbsState<U, S> {
pub fn with_data<S: BuilderApiState>(self, data: S) -> PbsState<S> {
PbsState {
data,
config: self.config,
Expand All @@ -49,7 +48,7 @@ impl<U> PbsState<U, ()> {
}
}

impl<U, S> PbsState<U, S>
impl<S> PbsState<S>
where
S: BuilderApiState,
{
Expand Down
Loading