diff --git a/Cargo.lock b/Cargo.lock index a216c35fc0f7d4..94a82ed502d600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9657,7 +9657,6 @@ dependencies = [ "solana-instruction", "solana-keypair", "solana-last-restart-slot", - "solana-metrics", "solana-program-entrypoint", "solana-program-runtime", "solana-pubkey", @@ -9670,6 +9669,7 @@ dependencies = [ "solana-svm-feature-set", "solana-svm-log-collector", "solana-svm-measure", + "solana-svm-metrics", "solana-svm-timings", "solana-svm-transaction", "solana-svm-type-overrides", @@ -10969,6 +10969,24 @@ dependencies = [ name = "solana-svm-measure" version = "3.0.0" +[[package]] +name = "solana-svm-metrics" +version = "3.0.0" +dependencies = [ + "bencher", + "crossbeam-channel", + "env_logger 0.11.8", + "gethostname", + "log", + "rand 0.8.5", + "reqwest 0.12.22", + "serial_test", + "solana-cluster-type", + "solana-sha256-hasher", + "solana-time-utils", + "thiserror 2.0.12", +] + [[package]] name = "solana-svm-timings" version = "3.0.0" diff --git a/Cargo.toml b/Cargo.toml index 5ba4ddda627711..b35a0fe26f2b23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ members = [ "svm-feature-set", "svm-log-collector", "svm-measure", + "svm-metrics", "svm-timings", "svm-transaction", "svm-type-overrides", @@ -525,6 +526,7 @@ solana-svm-callback = { path = "svm-callback", version = "=3.0.0" } solana-svm-feature-set = { path = "svm-feature-set", version = "=3.0.0" } solana-svm-log-collector = { path = "svm-log-collector", version = "=3.0.0" } solana-svm-measure = { path = "svm-measure", version = "=3.0.0" } +solana-svm-metrics = { path = "svm-metrics", version = "=3.0.0" } solana-svm-timings = { path = "svm-timings", version = "=3.0.0" } solana-svm-transaction = { path = "svm-transaction", version = "=3.0.0" } solana-svm-type-overrides = { path = "svm-type-overrides", version = "=3.0.0" } diff --git a/program-runtime/Cargo.toml b/program-runtime/Cargo.toml index ed103250caa0d0..4f897363aa4900 100644 --- a/program-runtime/Cargo.toml +++ b/program-runtime/Cargo.toml @@ -20,7 +20,7 @@ name = "solana_program_runtime" dev-context-only-utils = [] dummy-for-ci-check = ["metrics"] frozen-abi = ["dep:solana-frozen-abi", "dep:solana-frozen-abi-macro"] -metrics = ["dep:solana-metrics"] +metrics = ["dep:solana-svm-metrics"] shuttle-test = ["solana-sbpf/shuttle-test", "solana-svm-type-overrides/shuttle-test"] [dependencies] @@ -45,7 +45,6 @@ solana-frozen-abi-macro = { workspace = true, optional = true, features = [ solana-hash = { workspace = true } solana-instruction = { workspace = true } solana-last-restart-slot = { workspace = true } -solana-metrics = { workspace = true, optional = true } solana-program-entrypoint = { workspace = true } solana-pubkey = { workspace = true } solana-rent = { workspace = true } @@ -56,6 +55,7 @@ solana-svm-callback = { workspace = true } solana-svm-feature-set = { workspace = true } solana-svm-log-collector = { workspace = true } solana-svm-measure = { workspace = true } +solana-svm-metrics = { workspace = true, optional = true } solana-svm-timings = { workspace = true } solana-svm-transaction = { workspace = true } solana-svm-type-overrides = { workspace = true } diff --git a/program-runtime/src/lib.rs b/program-runtime/src/lib.rs index 4942513590016a..faca53168c65a3 100644 --- a/program-runtime/src/lib.rs +++ b/program-runtime/src/lib.rs @@ -4,7 +4,7 @@ #[cfg(feature = "metrics")] #[macro_use] -extern crate solana_metrics; +extern crate solana_svm_metrics; pub use solana_sbpf; pub mod execution_budget; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 30f47c81b8b22f..bbbc315b6a916c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -7494,7 +7494,6 @@ dependencies = [ "solana-hash", "solana-instruction", "solana-last-restart-slot", - "solana-metrics", "solana-program-entrypoint", "solana-pubkey", "solana-rent", @@ -7505,6 +7504,7 @@ dependencies = [ "solana-svm-feature-set", "solana-svm-log-collector", "solana-svm-measure", + "solana-svm-metrics", "solana-svm-timings", "solana-svm-transaction", "solana-svm-type-overrides", @@ -9292,6 +9292,20 @@ dependencies = [ name = "solana-svm-measure" version = "3.0.0" +[[package]] +name = "solana-svm-metrics" +version = "3.0.0" +dependencies = [ + "crossbeam-channel", + "gethostname", + "log", + "reqwest 0.12.22", + "solana-cluster-type", + "solana-sha256-hasher", + "solana-time-utils", + "thiserror 2.0.12", +] + [[package]] name = "solana-svm-timings" version = "3.0.0" diff --git a/svm-metrics/Cargo.toml b/svm-metrics/Cargo.toml new file mode 100644 index 00000000000000..906b8ef2d6d917 --- /dev/null +++ b/svm-metrics/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "solana-svm-metrics" +description = "Metrics collection for SVM" +documentation = "https://docs.rs/solana-svm-metrics" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[lib] +name = "solana_svm_metrics" + +[dependencies] +crossbeam-channel = { workspace = true } +gethostname = { workspace = true } +log = { workspace = true } +reqwest = { workspace = true, features = ["blocking", "brotli", "deflate", "gzip", "rustls-tls", "json"] } +solana-cluster-type = { workspace = true } +solana-sha256-hasher = { workspace = true } +solana-time-utils = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +bencher = { workspace = true } +env_logger = { workspace = true } +rand = { workspace = true } +serial_test = { workspace = true } + +[[bench]] +name = "metrics" +harness = false diff --git a/svm-metrics/benches/metrics.rs b/svm-metrics/benches/metrics.rs new file mode 100644 index 00000000000000..4d603d0387cea9 --- /dev/null +++ b/svm-metrics/benches/metrics.rs @@ -0,0 +1,92 @@ +use { + bencher::{benchmark_group, benchmark_main, Bencher}, + log::*, + rand::distributions::{Distribution, Uniform}, + solana_svm_metrics::{ + counter::CounterPoint, + datapoint::DataPoint, + metrics::{serialize_points, test_mocks::MockMetricsWriter, MetricsAgent}, + }, + std::{hint::black_box, sync::Arc, time::Duration}, +}; + +fn bench_write_points(b: &mut Bencher) { + let points = (0..10) + .map(|_| { + DataPoint::new("measurement") + .add_field_i64("i", 0) + .add_field_i64("abc123", 2) + .add_field_i64("this-is-my-very-long-field-name", 3) + .clone() + }) + .collect(); + let host_id = "benchmark-host-id"; + b.iter(|| { + for _ in 0..10 { + black_box(serialize_points(&points, host_id)); + } + }) +} + +fn bench_datapoint_submission(b: &mut Bencher) { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000); + + b.iter(|| { + for i in 0..1000 { + agent.submit( + DataPoint::new("measurement") + .add_field_i64("i", i) + .to_owned(), + Level::Info, + ); + } + agent.flush(); + }) +} + +fn bench_counter_submission(b: &mut Bencher) { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000); + + b.iter(|| { + for i in 0..1000 { + agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i); + } + agent.flush(); + }) +} + +fn bench_random_submission(b: &mut Bencher) { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000); + let mut rng = rand::thread_rng(); + let die = Uniform::::from(1..7); + + b.iter(|| { + for i in 0..1000 { + let dice = die.sample(&mut rng); + + if dice == 6 { + agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i); + } else { + agent.submit( + DataPoint::new("measurement") + .add_field_i64("i", i as i64) + .to_owned(), + Level::Info, + ); + } + } + agent.flush(); + }) +} + +benchmark_group!( + benches, + bench_write_points, + bench_datapoint_submission, + bench_counter_submission, + bench_random_submission +); +benchmark_main!(benches); diff --git a/svm-metrics/src/counter.rs b/svm-metrics/src/counter.rs new file mode 100644 index 00000000000000..21ba51ee46b0a7 --- /dev/null +++ b/svm-metrics/src/counter.rs @@ -0,0 +1,326 @@ +use { + crate::metrics::submit_counter, + log::*, + std::{ + env, + sync::atomic::{AtomicU64, AtomicUsize, Ordering}, + time::SystemTime, + }, +}; + +const DEFAULT_LOG_RATE: usize = 1000; +// Submit a datapoint every second by default +const DEFAULT_METRICS_RATE: u64 = 1000; + +pub struct Counter { + pub name: &'static str, + /// total accumulated value + pub counts: AtomicUsize, + pub times: AtomicUsize, + /// last accumulated value logged + pub lastlog: AtomicUsize, + pub lograte: AtomicUsize, + pub metricsrate: AtomicU64, +} + +#[derive(Clone, Debug)] +pub struct CounterPoint { + pub name: &'static str, + pub count: i64, + pub timestamp: SystemTime, +} + +impl CounterPoint { + pub fn new(name: &'static str) -> Self { + CounterPoint { + name, + count: 0, + timestamp: std::time::UNIX_EPOCH, + } + } +} + +#[macro_export] +macro_rules! create_counter { + ($name:expr, $lograte:expr, $metricsrate:expr) => { + $crate::counter::Counter { + name: $name, + counts: std::sync::atomic::AtomicUsize::new(0), + times: std::sync::atomic::AtomicUsize::new(0), + lastlog: std::sync::atomic::AtomicUsize::new(0), + lograte: std::sync::atomic::AtomicUsize::new($lograte), + metricsrate: std::sync::atomic::AtomicU64::new($metricsrate), + } + }; +} + +#[macro_export] +macro_rules! inc_counter { + ($name:expr, $level:expr, $count:expr) => { + $name.inc($level, $count) + }; +} + +#[macro_export] +macro_rules! inc_counter_info { + ($name:expr, $count:expr) => { + if log_enabled!(log::Level::Info) { + $name.inc(log::Level::Info, $count) + } + }; +} + +#[macro_export] +macro_rules! inc_new_counter { + ($name:expr, $count:expr, $level:expr, $lograte:expr, $metricsrate:expr) => {{ + if log_enabled!($level) { + static INC_NEW_COUNTER: std::sync::LazyLock<$crate::counter::Counter> = + std::sync::LazyLock::new(|| { + let mut counter = create_counter!($name, $lograte, $metricsrate); + counter.init(); + counter + }); + + INC_NEW_COUNTER.inc($level, $count); + } + }}; +} + +#[macro_export] +macro_rules! inc_new_counter_error { + ($name:expr, $count:expr) => {{ + inc_new_counter!($name, $count, log::Level::Error, 0, 0); + }}; + ($name:expr, $count:expr, $lograte:expr) => {{ + inc_new_counter!($name, $count, log::Level::Error, $lograte, 0); + }}; + ($name:expr, $count:expr, $lograte:expr, $metricsrate:expr) => {{ + inc_new_counter!($name, $count, log::Level::Error, $lograte, $metricsrate); + }}; +} + +#[macro_export] +macro_rules! inc_new_counter_warn { + ($name:expr, $count:expr) => {{ + inc_new_counter!($name, $count, log::Level::Warn, 0, 0); + }}; + ($name:expr, $count:expr, $lograte:expr) => {{ + inc_new_counter!($name, $count, log::Level::Warn, $lograte, 0); + }}; + ($name:expr, $count:expr, $lograte:expr, $metricsrate:expr) => {{ + inc_new_counter!($name, $count, log::Level::Warn, $lograte, $metricsrate); + }}; +} + +#[macro_export] +macro_rules! inc_new_counter_info { + ($name:expr, $count:expr) => {{ + inc_new_counter!($name, $count, log::Level::Info, 0, 0); + }}; + ($name:expr, $count:expr, $lograte:expr) => {{ + inc_new_counter!($name, $count, log::Level::Info, $lograte, 0); + }}; + ($name:expr, $count:expr, $lograte:expr, $metricsrate:expr) => {{ + inc_new_counter!($name, $count, log::Level::Info, $lograte, $metricsrate); + }}; +} + +#[macro_export] +macro_rules! inc_new_counter_debug { + ($name:expr, $count:expr) => {{ + inc_new_counter!($name, $count, log::Level::Debug, 0, 0); + }}; + ($name:expr, $count:expr, $lograte:expr) => {{ + inc_new_counter!($name, $count, log::Level::Debug, $lograte, 0); + }}; + ($name:expr, $count:expr, $lograte:expr, $metricsrate:expr) => {{ + inc_new_counter!($name, $count, log::Level::Debug, $lograte, $metricsrate); + }}; +} + +impl Counter { + fn default_metrics_rate() -> u64 { + let v = env::var("SOLANA_DEFAULT_METRICS_RATE") + .map(|x| x.parse().unwrap_or(0)) + .unwrap_or(0); + if v == 0 { + DEFAULT_METRICS_RATE + } else { + v + } + } + fn default_log_rate() -> usize { + let v = env::var("SOLANA_DEFAULT_LOG_RATE") + .map(|x| x.parse().unwrap_or(DEFAULT_LOG_RATE)) + .unwrap_or(DEFAULT_LOG_RATE); + if v == 0 { + DEFAULT_LOG_RATE + } else { + v + } + } + pub fn init(&mut self) { + #![allow(deprecated)] + self.lograte + .compare_and_swap(0, Self::default_log_rate(), Ordering::Relaxed); + self.metricsrate + .compare_and_swap(0, Self::default_metrics_rate(), Ordering::Relaxed); + } + pub fn inc(&self, level: log::Level, events: usize) { + let now = solana_time_utils::timestamp(); + let counts = self.counts.fetch_add(events, Ordering::Relaxed); + let times = self.times.fetch_add(1, Ordering::Relaxed); + let lograte = self.lograte.load(Ordering::Relaxed); + let metricsrate = self.metricsrate.load(Ordering::Relaxed); + + if times % lograte == 0 && times > 0 && log_enabled!(level) { + log!(level, + "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}", + self.name, + counts + events, + times, + now, + events, + ); + } + + let lastlog = self.lastlog.load(Ordering::Relaxed); + #[allow(deprecated)] + let prev = self + .lastlog + .compare_and_swap(lastlog, counts, Ordering::Relaxed); + if prev == lastlog { + let bucket = now / metricsrate; + let counter = CounterPoint { + name: self.name, + count: counts as i64 - lastlog as i64, + timestamp: SystemTime::now(), + }; + submit_counter(counter, level, bucket); + } + } +} +#[cfg(test)] +mod tests { + use { + crate::counter::{Counter, DEFAULT_LOG_RATE, DEFAULT_METRICS_RATE}, + log::{Level, *}, + serial_test::serial, + std::{ + env, + sync::{atomic::Ordering, RwLock}, + }, + }; + + fn get_env_lock() -> &'static RwLock<()> { + static ENV_LOCK: RwLock<()> = RwLock::new(()); + &ENV_LOCK + } + + /// Try to initialize the logger with a filter level of INFO. + /// + /// Incrementing a counter only happens if the logger is configured for the + /// given log level, so the tests need an INFO logger to pass. + fn try_init_logger_at_level_info() -> Result<(), log::SetLoggerError> { + // Use ::new() to configure the logger manually, instead of using the + // default of reading the RUST_LOG environment variable. Set is_test to + // print to stdout captured by the test runner, instead of polluting the + // test runner output. + let module_limit = None; + env_logger::Builder::new() + .filter(module_limit, log::LevelFilter::Info) + .is_test(true) + .try_init() + } + + #[test] + #[serial] + fn test_counter() { + try_init_logger_at_level_info().ok(); + let _readlock = get_env_lock().read(); + let mut counter = create_counter!("test", 1000, 1); + counter.init(); + counter.inc(Level::Info, 1); + assert_eq!(counter.counts.load(Ordering::Relaxed), 1); + assert_eq!(counter.times.load(Ordering::Relaxed), 1); + assert_eq!(counter.lograte.load(Ordering::Relaxed), 1000); + assert_eq!(counter.lastlog.load(Ordering::Relaxed), 0); + assert_eq!(counter.name, "test"); + for _ in 0..199 { + counter.inc(Level::Info, 2); + } + assert_eq!(counter.lastlog.load(Ordering::Relaxed), 397); + counter.inc(Level::Info, 2); + assert_eq!(counter.lastlog.load(Ordering::Relaxed), 399); + } + + #[test] + #[serial] + fn test_metricsrate() { + try_init_logger_at_level_info().ok(); + let _readlock = get_env_lock().read(); + env::remove_var("SOLANA_DEFAULT_METRICS_RATE"); + let mut counter = create_counter!("test", 1000, 0); + counter.init(); + assert_eq!( + counter.metricsrate.load(Ordering::Relaxed), + DEFAULT_METRICS_RATE + ); + } + + #[test] + #[serial] + fn test_metricsrate_env() { + try_init_logger_at_level_info().ok(); + let _writelock = get_env_lock().write(); + env::set_var("SOLANA_DEFAULT_METRICS_RATE", "50"); + let mut counter = create_counter!("test", 1000, 0); + counter.init(); + assert_eq!(counter.metricsrate.load(Ordering::Relaxed), 50); + } + + #[test] + #[serial] + fn test_inc_new_counter() { + let _readlock = get_env_lock().read(); + //make sure that macros are syntactically correct + //the variable is internal to the macro scope so there is no way to introspect it + inc_new_counter_info!("1", 1); + inc_new_counter_info!("2", 1, 3); + inc_new_counter_info!("3", 1, 2, 1); + } + + #[test] + #[serial] + fn test_lograte() { + try_init_logger_at_level_info().ok(); + let _readlock = get_env_lock().read(); + assert_eq!( + Counter::default_log_rate(), + DEFAULT_LOG_RATE, + "default_log_rate() is {}, expected {}, SOLANA_DEFAULT_LOG_RATE environment variable set?", + Counter::default_log_rate(), + DEFAULT_LOG_RATE, + ); + let mut counter = create_counter!("test_lograte", 0, 1); + counter.init(); + assert_eq!(counter.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE); + } + + #[test] + #[serial] + fn test_lograte_env() { + try_init_logger_at_level_info().ok(); + assert_ne!(DEFAULT_LOG_RATE, 0); + let _writelock = get_env_lock().write(); + let mut counter = create_counter!("test_lograte_env", 0, 1); + env::set_var("SOLANA_DEFAULT_LOG_RATE", "50"); + counter.init(); + assert_eq!(counter.lograte.load(Ordering::Relaxed), 50); + + let mut counter2 = create_counter!("test_lograte_env", 0, 1); + env::set_var("SOLANA_DEFAULT_LOG_RATE", "0"); + counter2.init(); + assert_eq!(counter2.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE); + } +} diff --git a/svm-metrics/src/datapoint.rs b/svm-metrics/src/datapoint.rs new file mode 100644 index 00000000000000..e2740ce3aecc47 --- /dev/null +++ b/svm-metrics/src/datapoint.rs @@ -0,0 +1,354 @@ +//! This file defines a set of macros for reporting metrics. +//! +//! To report a metric, simply calling one of the following datapoint macros +//! with a suitable message level: +//! +//! - datapoint_error! +//! - datapoint_warn! +//! - datapoint_trace! +//! - datapoint_info! +//! - datapoint_debug! +//! +//! The matric macro consists of the following three main parts: +//! - name: the name of the metric. +//! +//! - tags (optional): when a metric sample is reported with tags, you can use +//! group-by when querying the reported samples. Each metric sample can be +//! attached with zero to many tags. Each tag is of the format: +//! +//! - "tag-name" => "tag-value" +//! +//! - fields (optional): fields are the main content of a metric sample. The +//! macro supports four different types of fields: bool, i64, f64, and String. +//! Here're their syntax: +//! +//! - ("field-name", "field-value", bool) +//! - ("field-name", "field-value", i64) +//! - ("field-name", "field-value", f64) +//! - ("field-name", "field-value", String) +//! +//! Example: +//! +//! datapoint_debug!( +//! "name-of-the-metric", +//! "tag" => "tag-value", +//! "tag2" => "tag-value2", +//! ("some-bool", false, bool), +//! ("some-int", 100, i64), +//! ("some-float", 1.05, f64), +//! ("some-string", "field-value", String), +//! ); +//! +use std::{fmt, time::SystemTime}; + +#[derive(Clone, Debug)] +pub struct DataPoint { + pub name: &'static str, + pub timestamp: SystemTime, + /// tags are eligible for group-by operations. + pub tags: Vec<(&'static str, String)>, + pub fields: Vec<(&'static str, String)>, +} + +impl DataPoint { + pub fn new(name: &'static str) -> Self { + DataPoint { + name, + timestamp: SystemTime::now(), + tags: vec![], + fields: vec![], + } + } + + pub fn add_tag(&mut self, name: &'static str, value: &str) -> &mut Self { + self.tags.push((name, value.to_string())); + self + } + + pub fn add_field_str(&mut self, name: &'static str, value: &str) -> &mut Self { + self.fields + .push((name, format!("\"{}\"", value.replace('\"', "\\\"")))); + self + } + + pub fn add_field_bool(&mut self, name: &'static str, value: bool) -> &mut Self { + self.fields.push((name, value.to_string())); + self + } + + pub fn add_field_i64(&mut self, name: &'static str, value: i64) -> &mut Self { + self.fields.push((name, value.to_string() + "i")); + self + } + + pub fn add_field_f64(&mut self, name: &'static str, value: f64) -> &mut Self { + self.fields.push((name, value.to_string())); + self + } +} + +impl fmt::Display for DataPoint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "datapoint: {}", self.name)?; + for tag in &self.tags { + write!(f, ",{}={}", tag.0, tag.1)?; + } + for field in &self.fields { + write!(f, " {}={}", field.0, field.1)?; + } + Ok(()) + } +} + +#[macro_export] +macro_rules! create_datapoint { + (@field $point:ident $name:expr, $string:expr, String) => { + $point.add_field_str($name, &$string); + }; + (@field $point:ident $name:expr, $value:expr, i64) => { + $point.add_field_i64($name, $value as i64); + }; + (@field $point:ident $name:expr, $value:expr, f64) => { + $point.add_field_f64($name, $value as f64); + }; + (@field $point:ident $name:expr, $value:expr, bool) => { + $point.add_field_bool($name, $value as bool); + }; + (@tag $point:ident $tag_name:expr, $tag_value:expr) => { + $point.add_tag($tag_name, &$tag_value); + }; + + (@fields $point:ident) => {}; + + // process optional fields + (@fields $point:ident ($name:expr, $value:expr, Option<$type:ident>) , $($rest:tt)*) => { + if let Some(value) = $value { + $crate::create_datapoint!(@field $point $name, value, $type); + } + $crate::create_datapoint!(@fields $point $($rest)*); + }; + (@fields $point:ident ($name:expr, $value:expr, Option<$type:ident>) $(,)?) => { + if let Some(value) = $value { + $crate::create_datapoint!(@field $point $name, value, $type); + } + }; + + // process tags + (@fields $point:ident $tag_name:expr => $tag_value:expr, $($rest:tt)*) => { + $crate::create_datapoint!(@tag $point $tag_name, $tag_value); + $crate::create_datapoint!(@fields $point $($rest)*); + }; + (@fields $point:ident $tag_name:expr => $tag_value:expr $(,)?) => { + $crate::create_datapoint!(@tag $point $tag_name, $tag_value); + }; + + // process fields + (@fields $point:ident ($name:expr, $value:expr, $type:ident) , $($rest:tt)*) => { + $crate::create_datapoint!(@field $point $name, $value, $type); + $crate::create_datapoint!(@fields $point $($rest)*); + }; + (@fields $point:ident ($name:expr, $value:expr, $type:ident) $(,)?) => { + $crate::create_datapoint!(@field $point $name, $value, $type); + }; + + (@point $name:expr, $($fields:tt)+) => { + { + let mut point = $crate::datapoint::DataPoint::new(&$name); + $crate::create_datapoint!(@fields point $($fields)+); + point + } + }; +} + +#[macro_export] +macro_rules! datapoint { + ($level:expr, $name:expr, $($fields:tt)+) => { + if log::log_enabled!($level) { + $crate::submit($crate::create_datapoint!(@point $name, $($fields)+), $level); + } + }; +} +#[macro_export] +macro_rules! datapoint_error { + ($name:expr, $($fields:tt)+) => { + $crate::datapoint!(log::Level::Error, $name, $($fields)+); + }; +} + +#[macro_export] +macro_rules! datapoint_warn { + ($name:expr, $($fields:tt)+) => { + $crate::datapoint!(log::Level::Warn, $name, $($fields)+); + }; +} + +#[macro_export] +macro_rules! datapoint_info { + ($name:expr, $($fields:tt)+) => { + $crate::datapoint!(log::Level::Info, $name, $($fields)+); + }; +} + +#[macro_export] +macro_rules! datapoint_debug { + ($name:expr, $($fields:tt)+) => { + $crate::datapoint!(log::Level::Debug, $name, $($fields)+); + }; +} + +#[macro_export] +macro_rules! datapoint_trace { + ($name:expr, $($fields:tt)+) => { + $crate::datapoint!(log::Level::Trace, $name, $($fields)+); + }; +} + +#[cfg(test)] +mod test { + #[test] + fn test_datapoint() { + datapoint_debug!("name", ("field name", "test", String)); + datapoint_info!("name", ("field name", 12.34_f64, f64)); + datapoint_trace!("name", ("field name", true, bool)); + datapoint_warn!("name", ("field name", 1, i64)); + datapoint_error!("name", ("field name", 1, i64),); + datapoint!( + log::Level::Warn, + "name", + ("field1 name", 2, i64), + ("field2 name", 2, i64) + ); + datapoint_info!("name", ("field1 name", 2, i64), ("field2 name", 2, i64),); + datapoint_trace!( + "name", + ("field1 name", 2, i64), + ("field2 name", 2, i64), + ("field3 name", 3, i64) + ); + datapoint!( + log::Level::Error, + "name", + ("field1 name", 2, i64), + ("field2 name", 2, i64), + ("field3 name", 3, i64), + ); + + let point = create_datapoint!( + @point "name", + ("i64", 1, i64), + ("String", "string space string", String), + ("f64", 12.34_f64, f64), + ("bool", true, bool) + ); + assert_eq!(point.name, "name"); + assert_eq!(point.tags.len(), 0); + assert_eq!(point.fields[0], ("i64", "1i".to_string())); + assert_eq!( + point.fields[1], + ("String", "\"string space string\"".to_string()) + ); + assert_eq!(point.fields[2], ("f64", "12.34".to_string())); + assert_eq!(point.fields[3], ("bool", "true".to_string())); + } + + #[test] + fn test_optional_datapoint() { + datapoint_debug!("name", ("field name", Some("test"), Option)); + datapoint_info!("name", ("field name", Some(12.34_f64), Option)); + datapoint_trace!("name", ("field name", Some(true), Option)); + datapoint_warn!("name", ("field name", Some(1), Option)); + datapoint_error!("name", ("field name", Some(1), Option),); + datapoint_debug!("name", ("field name", None::, Option)); + datapoint_info!("name", ("field name", None::, Option)); + datapoint_trace!("name", ("field name", None::, Option)); + datapoint_warn!("name", ("field name", None::, Option)); + datapoint_error!("name", ("field name", None::, Option),); + + let point = create_datapoint!( + @point "name", + ("some_i64", Some(1), Option), + ("no_i64", None::, Option), + ("some_String", Some("string space string"), Option), + ("no_String", None::, Option), + ("some_f64", Some(12.34_f64), Option), + ("no_f64", None::, Option), + ("some_bool", Some(true), Option), + ("no_bool", None::, Option), + ); + assert_eq!(point.name, "name"); + assert_eq!(point.tags.len(), 0); + assert_eq!(point.fields[0], ("some_i64", "1i".to_string())); + assert_eq!( + point.fields[1], + ("some_String", "\"string space string\"".to_string()) + ); + assert_eq!(point.fields[2], ("some_f64", "12.34".to_string())); + assert_eq!(point.fields[3], ("some_bool", "true".to_string())); + assert_eq!(point.fields.len(), 4); + } + + #[test] + fn test_datapoint_with_tags() { + datapoint_debug!("name", "tag" => "tag-value", ("field name", "test", String)); + datapoint_info!( + "name", + "tag" => "tag-value", + "tag2" => "tag-value-2", + ("field name", 12.34_f64, f64) + ); + datapoint_trace!( + "name", + "tag" => "tag-value", + "tag2" => "tag-value-2", + "tag3" => "tag-value-3", + ("field name", true, bool) + ); + datapoint_warn!("name", "tag" => "tag-value"); + datapoint_error!("name", "tag" => "tag-value", ("field name", 1, i64),); + datapoint!( + log::Level::Warn, + "name", + "tag" => "tag-value", + ("field1 name", 2, i64), + ("field2 name", 2, i64) + ); + datapoint_info!("name", ("field1 name", 2, i64), ("field2 name", 2, i64),); + datapoint_trace!( + "name", + "tag" => "tag-value", + ("field1 name", 2, i64), + ("field2 name", 2, i64), + ("field3 name", 3, i64) + ); + datapoint!( + log::Level::Error, + "name", + "tag" => "tag-value", + ("field1 name", 2, i64), + ("field2 name", 2, i64), + ("field3 name", 3, i64), + ); + + let point = create_datapoint!( + @point "name", + "tag1" => "tag-value-1", + "tag2" => "tag-value-2", + "tag3" => "tag-value-3", + ("i64", 1, i64), + ("String", "string space string", String), + ("f64", 12.34_f64, f64), + ("bool", true, bool) + ); + assert_eq!(point.name, "name"); + assert_eq!(point.fields[0], ("i64", "1i".to_string())); + assert_eq!( + point.fields[1], + ("String", "\"string space string\"".to_string()) + ); + assert_eq!(point.fields[2], ("f64", "12.34".to_string())); + assert_eq!(point.fields[3], ("bool", "true".to_string())); + assert_eq!(point.tags[0], ("tag1", "tag-value-1".to_string())); + assert_eq!(point.tags[1], ("tag2", "tag-value-2".to_string())); + assert_eq!(point.tags[2], ("tag3", "tag-value-3".to_string())); + } +} diff --git a/svm-metrics/src/lib.rs b/svm-metrics/src/lib.rs new file mode 100644 index 00000000000000..e71c532d42b545 --- /dev/null +++ b/svm-metrics/src/lib.rs @@ -0,0 +1,83 @@ +#![allow(clippy::arithmetic_side_effects)] +pub mod counter; +pub mod datapoint; +pub mod metrics; +pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +// To track an external counter which cannot be reset and is always increasing +#[derive(Default)] +pub struct MovingStat { + value: AtomicU64, +} + +impl MovingStat { + pub fn update_stat(&self, old_value: &MovingStat, new_value: u64) { + let old = old_value.value.swap(new_value, Ordering::Acquire); + self.value + .fetch_add(new_value.saturating_sub(old), Ordering::Release); + } + + pub fn load_and_reset(&self) -> u64 { + self.value.swap(0, Ordering::Acquire) + } +} + +/// A helper that sends the count of created tokens as a datapoint. +#[allow(clippy::redundant_allocation)] +pub struct TokenCounter(Arc<&'static str>); + +impl TokenCounter { + /// Creates a new counter with the specified metrics `name`. + pub fn new(name: &'static str) -> Self { + Self(Arc::new(name)) + } + + /// Creates a new token for this counter. The metric's value will be equal + /// to the number of `CounterToken`s. + pub fn create_token(&self) -> CounterToken { + // new_count = strong_count + // - 1 (in TokenCounter) + // + 1 (token that's being created) + datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64)); + CounterToken(self.0.clone()) + } +} + +/// A token for `TokenCounter`. +#[allow(clippy::redundant_allocation)] +pub struct CounterToken(Arc<&'static str>); + +impl Clone for CounterToken { + fn clone(&self) -> Self { + // new_count = strong_count + // - 1 (in TokenCounter) + // + 1 (token that's being created) + datapoint_info!(*self.0, ("count", Arc::strong_count(&self.0), i64)); + CounterToken(self.0.clone()) + } +} + +impl Drop for CounterToken { + fn drop(&mut self) { + // new_count = strong_count + // - 1 (in TokenCounter, if it still exists) + // - 1 (token that's being dropped) + datapoint_info!( + *self.0, + ("count", Arc::strong_count(&self.0).saturating_sub(2), i64) + ); + } +} + +impl Drop for TokenCounter { + fn drop(&mut self) { + datapoint_info!( + *self.0, + ("count", Arc::strong_count(&self.0).saturating_sub(2), i64) + ); + } +} diff --git a/svm-metrics/src/metrics.rs b/svm-metrics/src/metrics.rs new file mode 100644 index 00000000000000..313fb053a064bd --- /dev/null +++ b/svm-metrics/src/metrics.rs @@ -0,0 +1,745 @@ +//! The `metrics` module enables sending measurements to an `InfluxDB` instance + +use { + crate::{counter::CounterPoint, datapoint::DataPoint}, + crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}, + gethostname::gethostname, + log::*, + solana_cluster_type::ClusterType, + solana_sha256_hasher::hash, + std::{ + cmp, + collections::HashMap, + convert::Into, + env, + fmt::Write, + sync::{Arc, Barrier, Mutex, Once, RwLock}, + thread, + time::{Duration, Instant, UNIX_EPOCH}, + }, + thiserror::Error, +}; + +type CounterMap = HashMap<(&'static str, u64), CounterPoint>; + +#[derive(Debug, Error)] +pub enum MetricsError { + #[error(transparent)] + VarError(#[from] env::VarError), + #[error(transparent)] + ReqwestError(#[from] reqwest::Error), + #[error("SOLANA_METRICS_CONFIG is invalid: '{0}'")] + ConfigInvalid(String), + #[error("SOLANA_METRICS_CONFIG is incomplete")] + ConfigIncomplete, + #[error("SOLANA_METRICS_CONFIG database mismatch: {0}")] + DbMismatch(String), +} + +impl From for String { + fn from(error: MetricsError) -> Self { + error.to_string() + } +} + +impl From<&CounterPoint> for DataPoint { + fn from(counter_point: &CounterPoint) -> Self { + let mut point = Self::new(counter_point.name); + point.timestamp = counter_point.timestamp; + point.add_field_i64("count", counter_point.count); + point + } +} + +#[derive(Debug)] +enum MetricsCommand { + Flush(Arc), + Submit(DataPoint, log::Level), + SubmitCounter(CounterPoint, log::Level, u64), +} + +pub struct MetricsAgent { + sender: Sender, +} + +pub trait MetricsWriter { + // Write the points and empty the vector. Called on the internal + // MetricsAgent worker thread. + fn write(&self, points: Vec); +} + +struct InfluxDbMetricsWriter { + write_url: Option, +} + +impl InfluxDbMetricsWriter { + fn new() -> Self { + Self { + write_url: Self::build_write_url().ok(), + } + } + + fn build_write_url() -> Result { + let config = get_metrics_config().map_err(|err| { + info!("metrics disabled: {}", err); + err + })?; + + info!( + "metrics configuration: host={} db={} username={}", + config.host, config.db, config.username + ); + + let write_url = format!( + "{}/write?db={}&u={}&p={}&precision=n", + &config.host, &config.db, &config.username, &config.password + ); + + Ok(write_url) + } +} + +pub fn serialize_points(points: &Vec, host_id: &str) -> String { + const TIMESTAMP_LEN: usize = 20; + const HOST_ID_LEN: usize = 8; // "host_id=".len() + const EXTRA_LEN: usize = 2; // "=,".len() + let mut len = 0; + for point in points { + for (name, value) in &point.fields { + len += name.len() + value.len() + EXTRA_LEN; + } + for (name, value) in &point.tags { + len += name.len() + value.len() + EXTRA_LEN; + } + len += point.name.len(); + len += TIMESTAMP_LEN; + len += host_id.len() + HOST_ID_LEN; + } + let mut line = String::with_capacity(len); + for point in points { + let _ = write!(line, "{},host_id={}", &point.name, host_id); + for (name, value) in point.tags.iter() { + let _ = write!(line, ",{name}={value}"); + } + + let mut first = true; + for (name, value) in point.fields.iter() { + let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value); + first = false; + } + let timestamp = point.timestamp.duration_since(UNIX_EPOCH); + let nanos = timestamp.unwrap().as_nanos(); + let _ = writeln!(line, " {nanos}"); + } + line +} + +impl MetricsWriter for InfluxDbMetricsWriter { + fn write(&self, points: Vec) { + if let Some(ref write_url) = self.write_url { + debug!("submitting {} points", points.len()); + + let host_id = HOST_ID.read().unwrap(); + + let line = serialize_points(&points, &host_id); + + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(5)) + .build(); + let client = match client { + Ok(client) => client, + Err(err) => { + warn!("client instantiation failed: {}", err); + return; + } + }; + + let response = client.post(write_url.as_str()).body(line).send(); + if let Ok(resp) = response { + let status = resp.status(); + if !status.is_success() { + let text = resp + .text() + .unwrap_or_else(|_| "[text body empty]".to_string()); + warn!("submit response unsuccessful: {} {}", status, text,); + } + } else { + warn!("submit error: {}", response.unwrap_err()); + } + } + } +} + +impl Default for MetricsAgent { + fn default() -> Self { + let max_points_per_sec = env::var("SOLANA_METRICS_MAX_POINTS_PER_SECOND") + .map(|x| { + x.parse() + .expect("Failed to parse SOLANA_METRICS_MAX_POINTS_PER_SECOND") + }) + .unwrap_or(4000); + + Self::new( + Arc::new(InfluxDbMetricsWriter::new()), + Duration::from_secs(10), + max_points_per_sec, + ) + } +} + +impl MetricsAgent { + pub fn new( + writer: Arc, + write_frequency: Duration, + max_points_per_sec: usize, + ) -> Self { + let (sender, receiver) = unbounded::(); + + thread::Builder::new() + .name("solMetricsAgent".into()) + .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec)) + .unwrap(); + + Self { sender } + } + + // Combines `points` and `counters` into a single array of `DataPoint`s, appending a data point + // with the metrics stats at the end. + // + // Limits the number of produced points to the `max_points` value. Takes `points` followed by + // `counters`, dropping `counters` first. + // + // `max_points_per_sec` is only used in a warning message. + // `points_buffered` is used in the stats. + fn combine_points( + max_points: usize, + max_points_per_sec: usize, + secs_since_last_write: u64, + points_buffered: usize, + points: &mut Vec, + counters: &mut CounterMap, + ) -> Vec { + // Reserve one slot for the stats point we will add at the end. + let max_points = max_points.saturating_sub(1); + + let num_points = points.len().saturating_add(counters.len()); + let fit_counters = max_points.saturating_sub(points.len()); + let points_written = cmp::min(num_points, max_points); + + debug!("run: attempting to write {} points", num_points); + + if num_points > max_points { + warn!( + "Max submission rate of {} datapoints per second exceeded. Only the \ + first {} of {} points will be submitted.", + max_points_per_sec, max_points, num_points + ); + } + + let mut combined = std::mem::take(points); + combined.truncate(points_written); + + combined.extend(counters.values().take(fit_counters).map(|v| v.into())); + counters.clear(); + + combined.push( + DataPoint::new("metrics") + .add_field_i64("points_written", points_written as i64) + .add_field_i64("num_points", num_points as i64) + .add_field_i64("points_lost", (num_points - points_written) as i64) + .add_field_i64("points_buffered", points_buffered as i64) + .add_field_i64("secs_since_last_write", secs_since_last_write as i64) + .to_owned(), + ); + + combined + } + + // Consumes provided `points`, sending up to `max_points` of them into the `writer`. + // + // Returns an updated value for `last_write_time`. Which is equal to `Instant::now()`, just + // before `write` in updated. + fn write( + writer: &Arc, + max_points: usize, + max_points_per_sec: usize, + last_write_time: Instant, + points_buffered: usize, + points: &mut Vec, + counters: &mut CounterMap, + ) -> Instant { + let now = Instant::now(); + let secs_since_last_write = now.duration_since(last_write_time).as_secs(); + + writer.write(Self::combine_points( + max_points, + max_points_per_sec, + secs_since_last_write, + points_buffered, + points, + counters, + )); + + now + } + + fn run( + receiver: &Receiver, + writer: &Arc, + write_frequency: Duration, + max_points_per_sec: usize, + ) { + trace!("run: enter"); + let mut last_write_time = Instant::now(); + let mut points = Vec::::new(); + let mut counters = CounterMap::new(); + + let max_points = write_frequency.as_secs() as usize * max_points_per_sec; + + // Bind common arguments in the `Self::write()` call. + let write = |last_write_time: Instant, + points: &mut Vec, + counters: &mut CounterMap| + -> Instant { + Self::write( + writer, + max_points, + max_points_per_sec, + last_write_time, + receiver.len(), + points, + counters, + ) + }; + + loop { + match receiver.try_recv() { + Ok(cmd) => match cmd { + MetricsCommand::Flush(barrier) => { + debug!("metrics_thread: flush"); + last_write_time = write(last_write_time, &mut points, &mut counters); + barrier.wait(); + } + MetricsCommand::Submit(point, level) => { + log!(level, "{}", point); + points.push(point); + } + MetricsCommand::SubmitCounter(counter, _level, bucket) => { + debug!("{:?}", counter); + let key = (counter.name, bucket); + if let Some(value) = counters.get_mut(&key) { + value.count += counter.count; + } else { + counters.insert(key, counter); + } + } + }, + Err(TryRecvError::Empty) => { + std::thread::sleep(Duration::from_millis(5)); + } + Err(TryRecvError::Disconnected) => { + debug!("run: sender disconnected"); + break; + } + }; + + let now = Instant::now(); + if now.duration_since(last_write_time) >= write_frequency { + last_write_time = write(last_write_time, &mut points, &mut counters); + } + } + + debug_assert!( + points.is_empty() && counters.is_empty(), + "Controlling `MetricsAgent` is expected to call `flush()` from the `Drop` \n\ + implementation, before exiting. So both `points` and `counters` must be empty at \n\ + this point.\n\ + `points`: {points:?}\n\ + `counters`: {counters:?}", + ); + + trace!("run: exit"); + } + + pub fn submit(&self, point: DataPoint, level: log::Level) { + self.sender + .send(MetricsCommand::Submit(point, level)) + .unwrap(); + } + + pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) { + self.sender + .send(MetricsCommand::SubmitCounter(counter, level, bucket)) + .unwrap(); + } + + pub fn flush(&self) { + debug!("Flush"); + let barrier = Arc::new(Barrier::new(2)); + self.sender + .send(MetricsCommand::Flush(Arc::clone(&barrier))) + .unwrap(); + + barrier.wait(); + } +} + +impl Drop for MetricsAgent { + fn drop(&mut self) { + self.flush(); + } +} + +fn get_singleton_agent() -> &'static MetricsAgent { + static AGENT: std::sync::LazyLock = + std::sync::LazyLock::new(MetricsAgent::default); + &AGENT +} + +static HOST_ID: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + RwLock::new({ + let hostname: String = gethostname() + .into_string() + .unwrap_or_else(|_| "".to_string()); + format!("{}", hash(hostname.as_bytes())) + }) +}); + +pub fn set_host_id(host_id: String) { + info!("host id: {}", host_id); + *HOST_ID.write().unwrap() = host_id; +} + +/// Submits a new point from any thread. Note that points are internally queued +/// and transmitted periodically in batches. +pub fn submit(point: DataPoint, level: log::Level) { + let agent = get_singleton_agent(); + agent.submit(point, level); +} + +/// Submits a new counter or updates an existing counter from any thread. Note that points are +/// internally queued and transmitted periodically in batches. +pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) { + let agent = get_singleton_agent(); + agent.submit_counter(point, level, bucket); +} + +#[derive(Debug, Default)] +struct MetricsConfig { + pub host: String, + pub db: String, + pub username: String, + pub password: String, +} + +impl MetricsConfig { + fn complete(&self) -> bool { + !(self.host.is_empty() + || self.db.is_empty() + || self.username.is_empty() + || self.password.is_empty()) + } +} + +fn get_metrics_config() -> Result { + let mut config = MetricsConfig::default(); + let config_var = env::var("SOLANA_METRICS_CONFIG")?; + if config_var.is_empty() { + Err(env::VarError::NotPresent)?; + } + + for pair in config_var.split(',') { + let nv: Vec<_> = pair.split('=').collect(); + if nv.len() != 2 { + return Err(MetricsError::ConfigInvalid(pair.to_string())); + } + let v = nv[1].to_string(); + match nv[0] { + "host" => config.host = v, + "db" => config.db = v, + "u" => config.username = v, + "p" => config.password = v, + _ => return Err(MetricsError::ConfigInvalid(pair.to_string())), + } + } + + if !config.complete() { + return Err(MetricsError::ConfigIncomplete); + } + + Ok(config) +} + +pub fn metrics_config_sanity_check(cluster_type: ClusterType) -> Result<(), MetricsError> { + let config = match get_metrics_config() { + Ok(config) => config, + Err(MetricsError::VarError(env::VarError::NotPresent)) => return Ok(()), + Err(e) => return Err(e), + }; + match &config.db[..] { + "mainnet-beta" if cluster_type != ClusterType::MainnetBeta => (), + "tds" if cluster_type != ClusterType::Testnet => (), + "devnet" if cluster_type != ClusterType::Devnet => (), + _ => return Ok(()), + }; + let (host, db) = (&config.host, &config.db); + let msg = format!("cluster_type={cluster_type:?} host={host} database={db}"); + Err(MetricsError::DbMismatch(msg)) +} + +pub fn query(q: &str) -> Result { + let config = get_metrics_config()?; + let query_url = format!( + "{}/query?u={}&p={}&q={}", + &config.host, &config.username, &config.password, &q + ); + + let response = reqwest::blocking::get(query_url.as_str())?.text()?; + + Ok(response) +} + +/// Blocks until all pending points from previous calls to `submit` have been +/// transmitted. +pub fn flush() { + let agent = get_singleton_agent(); + agent.flush(); +} + +/// Hook the panic handler to generate a data point on each panic +pub fn set_panic_hook(program: &'static str, version: Option) { + static SET_HOOK: Once = Once::new(); + SET_HOOK.call_once(|| { + let default_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |ono| { + default_hook(ono); + let location = match ono.location() { + Some(location) => location.to_string(), + None => "?".to_string(), + }; + submit( + DataPoint::new("panic") + .add_field_str("program", program) + .add_field_str("thread", thread::current().name().unwrap_or("?")) + // The 'one' field exists to give Kapacitor Alerts a numerical value + // to filter on + .add_field_i64("one", 1) + .add_field_str("message", &ono.to_string()) + .add_field_str("location", &location) + .add_field_str("version", version.as_ref().unwrap_or(&"".to_string())) + .to_owned(), + Level::Error, + ); + // Flush metrics immediately + flush(); + + // Exit cleanly so the process don't limp along in a half-dead state + std::process::exit(1); + })); + }); +} + +pub mod test_mocks { + use super::*; + + pub struct MockMetricsWriter { + pub points_written: Arc>>, + } + impl MockMetricsWriter { + pub fn new() -> Self { + MockMetricsWriter { + points_written: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn points_written(&self) -> usize { + self.points_written.lock().unwrap().len() + } + } + + impl Default for MockMetricsWriter { + fn default() -> Self { + Self::new() + } + } + + impl MetricsWriter for MockMetricsWriter { + fn write(&self, points: Vec) { + assert!(!points.is_empty()); + + let new_points = points.len(); + self.points_written.lock().unwrap().extend(points); + + info!( + "Writing {} points ({} total)", + new_points, + self.points_written(), + ); + } + } +} + +#[cfg(test)] +mod test { + use {super::*, test_mocks::MockMetricsWriter}; + + #[test] + fn test_submit() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); + + for i in 0..42 { + agent.submit( + DataPoint::new("measurement") + .add_field_i64("i", i) + .to_owned(), + Level::Info, + ); + } + + agent.flush(); + assert_eq!(writer.points_written(), 43); + } + + #[test] + fn test_submit_counter() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); + + for i in 0..10 { + agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i); + agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i); + } + + agent.flush(); + assert_eq!(writer.points_written(), 21); + } + + #[test] + fn test_submit_counter_increment() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); + + for _ in 0..10 { + agent.submit_counter( + CounterPoint { + name: "counter", + count: 10, + timestamp: UNIX_EPOCH, + }, + Level::Info, + 0, // use the same bucket + ); + } + + agent.flush(); + assert_eq!(writer.points_written(), 2); + + let submitted_point = writer.points_written.lock().unwrap()[0].clone(); + assert_eq!(submitted_point.fields[0], ("count", "100i".to_string())); + } + + #[test] + fn test_submit_bucketed_counter() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); + + for i in 0..50 { + agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10); + agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10); + } + + agent.flush(); + assert_eq!(writer.points_written(), 11); + } + + #[test] + fn test_submit_with_delay() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000); + + agent.submit(DataPoint::new("point 1"), Level::Info); + thread::sleep(Duration::from_secs(2)); + assert_eq!(writer.points_written(), 2); + } + + #[test] + fn test_submit_exceed_max_rate() { + let writer = Arc::new(MockMetricsWriter::new()); + + let max_points_per_sec = 100; + + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec); + + for i in 0..(max_points_per_sec + 20) { + agent.submit( + DataPoint::new("measurement") + .add_field_i64("i", i.try_into().unwrap()) + .to_owned(), + Level::Info, + ); + } + + agent.flush(); + + // We are expecting `max_points_per_sec - 1` data points from `submit()` and one more metric + // stats data points. + assert_eq!(writer.points_written(), max_points_per_sec); + } + + #[test] + fn test_multithread_submit() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = Arc::new(Mutex::new(MetricsAgent::new( + writer.clone(), + Duration::from_secs(10), + 1000, + ))); + + // + // Submit measurements from different threads + // + let mut threads = Vec::new(); + for i in 0..42 { + let mut point = DataPoint::new("measurement"); + point.add_field_i64("i", i); + let agent = Arc::clone(&agent); + threads.push(thread::spawn(move || { + agent.lock().unwrap().submit(point, Level::Info); + })); + } + + for thread in threads { + thread.join().unwrap(); + } + + agent.lock().unwrap().flush(); + assert_eq!(writer.points_written(), 43); + } + + #[test] + fn test_flush_before_drop() { + let writer = Arc::new(MockMetricsWriter::new()); + { + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000); + agent.submit(DataPoint::new("point 1"), Level::Info); + } + + // The datapoints we expect to see are: + // 1. `point 1` from the above. + // 2. `metrics` stats submitted as a result of the `Flush` sent by `agent` being destroyed. + assert_eq!(writer.points_written(), 2); + } + + #[test] + fn test_live_submit() { + let agent = MetricsAgent::default(); + + let point = DataPoint::new("live_submit_test") + .add_field_bool("true", true) + .add_field_bool("random_bool", rand::random::() < 128) + .add_field_i64("random_int", rand::random::() as i64) + .to_owned(); + agent.submit(point, Level::Info); + } +}