Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 86 additions & 8 deletions watchtower/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(clippy::arithmetic_side_effects)]

use {
clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg},
clap::{crate_description, crate_name, value_t_or_exit, App, Arg},
log::*,
solana_clap_utils::{
hidden_unless_forced,
Expand Down Expand Up @@ -31,7 +31,7 @@ struct Config {
address_labels: HashMap<String, String>,
ignore_http_bad_gateway: bool,
interval: Duration,
json_rpc_url: String,
json_rpc_urls: Vec<String>,
rpc_timeout: Duration,
minimum_validator_identity_balance: u64,
monitor_active_stake: bool,
Expand Down Expand Up @@ -81,11 +81,13 @@ fn get_config() -> Config {
}
})
.arg(
Arg::with_name("json_rpc_url")
Arg::with_name("json_rpc_urls")
.long("url")
.value_name("URL")
.takes_value(true)
.validator(is_url)
.multiple(true)
.required(true)
.help("JSON RPC URL for the cluster"),
)
.arg(
Expand Down Expand Up @@ -183,8 +185,12 @@ fn get_config() -> Config {
"minimum_validator_identity_balance",
f64
));
let json_rpc_url =
value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone());
let json_rpc_urls: Vec<String> = matches
.values_of("json_rpc_urls")
.unwrap()
.map(|v| v.to_string())
.collect();
// let json_rpc_url = value_t!(matches, "json_rpc_url", String).unwrap_or_else(|_| config.json_rpc_url.clone());
let rpc_timeout = value_t_or_exit!(matches, "rpc_timeout", u64);
let rpc_timeout = Duration::from_secs(rpc_timeout);
let validator_identity_pubkeys: Vec<_> = pubkeys_of(&matches, "validator_identities")
Expand All @@ -203,7 +209,7 @@ fn get_config() -> Config {
address_labels: config.address_labels,
ignore_http_bad_gateway,
interval,
json_rpc_url,
json_rpc_urls,
rpc_timeout,
minimum_validator_identity_balance,
monitor_active_stake,
Expand All @@ -213,7 +219,7 @@ fn get_config() -> Config {
name_suffix,
};

info!("RPC URL: {}", config.json_rpc_url);
info!("RPC URL: {:?}", config.json_rpc_urls);
info!(
"Monitored validators: {:?}",
config.validator_identity_pubkeys
Expand Down Expand Up @@ -245,13 +251,52 @@ fn get_cluster_info(
))
}

// Evaluate the lowest latency URL, return an RPC client and a HashMap of all the latencies
// This hashmap will be updated regularly
fn get_all_latencies_and_lowest_latency_client(config: &Config) -> (HashMap<String, u128>, RpcClient) {
let mut best_url = String::from("");
let mut lowest_latency = u128::MAX;
let mut urls_with_latencies: HashMap<String, u128> = HashMap::new();

info!("Evaluating all RPCs...");

for url in config.json_rpc_urls.iter() {
let rpc = RpcClient::new_with_timeout(url, config.rpc_timeout);

let now = Instant::now();

let result = rpc.get_latest_blockhash();

let mut latency = now.elapsed().as_millis();

if latency < lowest_latency {
lowest_latency = latency;
best_url = url.clone();
}

if result.is_err() {
error!("{} is unhealthy!", url);
latency = u128::MAX;
} else {
info!("{} is healthy!", url);
}

urls_with_latencies.insert(url.clone(), latency);
}

(
urls_with_latencies,
RpcClient::new_with_timeout(best_url, config.rpc_timeout),
)
}

fn main() -> Result<(), Box<dyn error::Error>> {
solana_logger::setup_with_default("solana=info");
solana_metrics::set_panic_hook("watchtower", /*version:*/ None);

let config = get_config();

let rpc_client = RpcClient::new_with_timeout(config.json_rpc_url.clone(), config.rpc_timeout);
let (mut urls_with_latencies, mut rpc_client) = get_all_latencies_and_lowest_latency_client(&config);
let notifier = Notifier::default();
let mut last_transaction_count = 0;
let mut last_recent_blockhash = Hash::default();
Expand All @@ -260,9 +305,19 @@ fn main() -> Result<(), Box<dyn error::Error>> {
let mut last_success = Instant::now();
let mut incident = Hash::new_unique();

let mut current_latency = u128::MAX;

// Iteration count
let mut i = 0u8;
loop {
info!("Current RPC: {}", rpc_client.url());
let now = Instant::now();

let failure = match get_cluster_info(&config, &rpc_client) {
Ok((transaction_count, recent_blockhash, vote_accounts, validator_balances)) => {
// Update the current latency of the current url
current_latency = now.elapsed().as_millis();

info!("Current transaction count: {}", transaction_count);
info!("Recent blockhash: {}", recent_blockhash);
info!("Current validator count: {}", vote_accounts.current.len());
Expand Down Expand Up @@ -386,6 +441,9 @@ fn main() -> Result<(), Box<dyn error::Error>> {
);
num_consecutive_failures += 1;
if num_consecutive_failures > config.unhealthy_threshold {
// The RPC is no longer usable. Time to use one that is more reliable
current_latency = u128::MAX;

datapoint_info!("watchtower-sanity", ("ok", false, bool));
if last_notification_msg != notification_msg {
notifier.send(&notification_msg, &NotificationType::Trigger { incident });
Expand Down Expand Up @@ -424,6 +482,26 @@ fn main() -> Result<(), Box<dyn error::Error>> {
num_consecutive_failures = 0;
incident = Hash::new_unique();
}
// Update latency of the current url to the new latency
urls_with_latencies.insert(rpc_client.url().clone(), current_latency);

// Go through all the previous latencies:
// If current_latency is higher that the first found lower latency url, pick that one.
for (url, latency) in urls_with_latencies.iter() {
if *latency < current_latency {
info!("Switching to {}", url);
rpc_client = RpcClient::new_with_timeout(url.clone(), config.rpc_timeout);
break;
}
}

i += 1u8;

// After every x iterations, re-evaluate all RPC healths
if i >= 10u8 {
i = 0u8;
(urls_with_latencies, rpc_client) = get_all_latencies_and_lowest_latency_client(&config);
}
sleep(config.interval);
}
}