diff --git a/watchtower/src/main.rs b/watchtower/src/main.rs index 341b7903c0a33e..4b0eaaf54a451c 100644 --- a/watchtower/src/main.rs +++ b/watchtower/src/main.rs @@ -2,7 +2,7 @@ #![allow(clippy::arithmetic_side_effects)] use { - clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg}, + clap::{crate_description, crate_name, value_t_or_exit, App, Arg}, log::*, solana_clap_utils::{ hidden_unless_forced, @@ -31,7 +31,7 @@ struct Config { address_labels: HashMap, ignore_http_bad_gateway: bool, interval: Duration, - json_rpc_url: String, + json_rpc_urls: Vec, rpc_timeout: Duration, minimum_validator_identity_balance: u64, monitor_active_stake: bool, @@ -81,11 +81,13 @@ fn get_config() -> Config { } }) .arg( - Arg::with_name("json_rpc_url") + Arg::with_name("json_rpc_urls") .long("url") .value_name("URL") .takes_value(true) .validator(is_url) + .multiple(true) + .required(true) .help("JSON RPC URL for the cluster"), ) .arg( @@ -183,8 +185,12 @@ fn get_config() -> Config { "minimum_validator_identity_balance", f64 )); - let json_rpc_url = - value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone()); + let json_rpc_urls: Vec = matches + .values_of("json_rpc_urls") + .unwrap() + .map(|v| v.to_string()) + .collect(); + // let json_rpc_url = value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone()); let rpc_timeout = value_t_or_exit!(matches, "rpc_timeout", u64); let rpc_timeout = Duration::from_secs(rpc_timeout); let validator_identity_pubkeys: Vec<_> = pubkeys_of(&matches, "validator_identities") @@ -203,7 +209,7 @@ fn get_config() -> Config { address_labels: config.address_labels, ignore_http_bad_gateway, interval, - json_rpc_url, + json_rpc_urls, rpc_timeout, minimum_validator_identity_balance, monitor_active_stake, @@ -213,7 +219,7 @@ fn get_config() -> Config { name_suffix, }; - info!("RPC URL: {}", config.json_rpc_url); + info!("RPC URL: {:?}", config.json_rpc_urls); info!( "Monitored validators: {:?}", config.validator_identity_pubkeys @@ -245,13 +251,52 @@ fn get_cluster_info( )) } +// Evaluate the lowest latency URL, return an RPC client and a HashMap of all the latencies +// This hashmap will be updated regularly +fn get_all_latencies_and_lowest_latency_client(config: &Config) -> (HashMap, RpcClient) { + let mut best_url = String::from(""); + let mut lowest_latency = u128::MAX; + let mut urls_with_latencies: HashMap = HashMap::new(); + + info!("Evaluating all RPCs..."); + + for url in config.json_rpc_urls.iter() { + let rpc = RpcClient::new_with_timeout(url, config.rpc_timeout); + + let now = Instant::now(); + + let result = rpc.get_latest_blockhash(); + + let mut latency = now.elapsed().as_millis(); + + if latency < lowest_latency { + lowest_latency = latency; + best_url = url.clone(); + } + + if result.is_err() { + error!("{} is unhealthy!", url); + latency = u128::MAX; + } else { + info!("{} is healthy!", url); + } + + urls_with_latencies.insert(url.clone(), latency); + } + + ( + urls_with_latencies, + RpcClient::new_with_timeout(best_url, config.rpc_timeout), + ) +} + fn main() -> Result<(), Box> { solana_logger::setup_with_default("solana=info"); solana_metrics::set_panic_hook("watchtower", /*version:*/ None); let config = get_config(); - let rpc_client = RpcClient::new_with_timeout(config.json_rpc_url.clone(), config.rpc_timeout); + let (mut urls_with_latencies, mut rpc_client) = get_all_latencies_and_lowest_latency_client(&config); let notifier = Notifier::default(); let mut last_transaction_count = 0; let mut last_recent_blockhash = Hash::default(); @@ -260,9 +305,19 @@ fn main() -> Result<(), Box> { let mut last_success = Instant::now(); let mut incident = Hash::new_unique(); + let mut current_latency = u128::MAX; + + // Iteration count + let mut i = 0u8; loop { + info!("Current RPC: {}", rpc_client.url()); + let now = Instant::now(); + let failure = match get_cluster_info(&config, &rpc_client) { Ok((transaction_count, recent_blockhash, vote_accounts, validator_balances)) => { + // Update the current latency of the current url + current_latency = now.elapsed().as_millis(); + info!("Current transaction count: {}", transaction_count); info!("Recent blockhash: {}", recent_blockhash); info!("Current validator count: {}", vote_accounts.current.len()); @@ -386,6 +441,9 @@ fn main() -> Result<(), Box> { ); num_consecutive_failures += 1; if num_consecutive_failures > config.unhealthy_threshold { + // The RPC is no longer usable. Time to use one that is more reliable + current_latency = u128::MAX; + datapoint_info!("watchtower-sanity", ("ok", false, bool)); if last_notification_msg != notification_msg { notifier.send(¬ification_msg, &NotificationType::Trigger { incident }); @@ -424,6 +482,26 @@ fn main() -> Result<(), Box> { num_consecutive_failures = 0; incident = Hash::new_unique(); } + // Update latency of the current url to the new latency + urls_with_latencies.insert(rpc_client.url().clone(), current_latency); + + // Go through all the previous latencies: + // If current_latency is higher that the first found lower latency url, pick that one. + for (url, latency) in urls_with_latencies.iter() { + if *latency < current_latency { + info!("Switching to {}", url); + rpc_client = RpcClient::new_with_timeout(url.clone(), config.rpc_timeout); + break; + } + } + + i += 1u8; + + // After every x iterations, re-evaluate all RPC healths + if i >= 10u8 { + i = 0u8; + (urls_with_latencies, rpc_client) = get_all_latencies_and_lowest_latency_client(&config); + } sleep(config.interval); } }