diff --git a/Cargo.lock b/Cargo.lock index ecad0a3ed..afd175bb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,12 +543,15 @@ dependencies = [ "actix-web 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "mockito 0.17.1 (registry+https://github.com/rust-lang/crates.io-index)", "prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.93 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "twoway 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/commons/Cargo.toml b/commons/Cargo.toml index e1c2438da..9acaf7248 100644 --- a/commons/Cargo.toml +++ b/commons/Cargo.toml @@ -15,3 +15,9 @@ serde = "^1.0.70" serde_json = "^1.0.34" tokio = "^0.1" url = "^1.7.2" +futures = "^0.1" + +[dev-dependencies] +tokio = "^0.1" +twoway = "^0.2" +mockito = "^0.17.1" \ No newline at end of file diff --git a/commons/src/lib.rs b/commons/src/lib.rs index 2547f0aaf..2c1cec7ff 100644 --- a/commons/src/lib.rs +++ b/commons/src/lib.rs @@ -17,11 +17,11 @@ mod config; pub use crate::config::MergeOptions; pub mod de; +pub mod metrics; +pub mod testing; mod errors; -pub use crate::errors::{register_metrics, GraphError, MISSING_APPSTATE_PANIC_MSG}; - -pub mod testing; +pub use errors::{register_metrics, GraphError, MISSING_APPSTATE_PANIC_MSG}; use actix_web::http::header; use std::collections::HashSet; diff --git a/commons/src/metrics.rs b/commons/src/metrics.rs new file mode 100644 index 000000000..fa3f2df42 --- /dev/null +++ b/commons/src/metrics.rs @@ -0,0 +1,104 @@ +//! Metrics service. + +use actix_web::{HttpRequest, HttpResponse}; +use failure::Fallible; +use futures::future; +use futures::prelude::*; +use prometheus::{self, Registry}; + +/// For types that store a static Registry reference +pub trait HasRegistry { + /// Get the static registry reference + fn registry(&self) -> &'static Registry; +} + +/// Minimally wraps a Registry for implementing `HasRegistry`. +pub struct RegistryWrapper(pub &'static Registry); + +impl HasRegistry for RegistryWrapper { + fn registry(&self) -> &'static Registry { + self.0 + } +} + +/// Serve metrics requests (Prometheus textual format). +pub fn serve(req: HttpRequest) -> Box> +where + T: 'static + HasRegistry, +{ + use prometheus::Encoder; + + let registry: &Registry = match req.app_data::() { + Some(t) => t.registry(), + None => { + return Box::new(futures::future::err(failure::err_msg( + "could not get registry from app_data", + ))) + } + }; + + let resp = future::ok(registry.gather()) + .and_then(|metrics| { + let tenc = prometheus::TextEncoder::new(); + let mut buf = vec![]; + tenc.encode(&metrics, &mut buf).and(Ok(buf)) + }) + .from_err() + .map(|content| HttpResponse::Ok().body(content)); + Box::new(resp) +} + +/// Create a custom Prometheus registry. +pub fn new_registry(prefix: Option) -> Fallible { + Registry::new_custom(prefix.clone(), None).map_err(|e| { + failure::err_msg(format!( + "could not create a custom regostry with prefix {:?}: {}", + prefix, + e.to_string() + )) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing; + use actix_web::test::TestRequest; + + #[test] + fn serve_metrics_basic() -> Fallible<()> { + let mut rt = testing::init_runtime()?; + + let metrics_prefix = "cincinnati"; + let registry_wrapped = RegistryWrapper(Box::leak(Box::new(new_registry(Some( + metrics_prefix.to_string(), + ))?))); + + testing::dummy_gauge(®istry_wrapped.0, 42.0)?; + + let http_req = TestRequest::default() + .data(registry_wrapped) + .to_http_request(); + + let metrics_call = serve::(http_req); + let resp = rt.block_on(metrics_call)?; + + assert_eq!(resp.status(), 200); + if let actix_web::body::ResponseBody::Body(body) = resp.body() { + if let actix_web::body::Body::Bytes(bytes) = body { + assert!(!bytes.is_empty()); + assert!(twoway::find_bytes( + bytes.as_ref(), + format!("{}_dummy_gauge 42\n", metrics_prefix).as_bytes() + ) + .is_some()); + } else { + bail!("expected Body") + } + } else { + bail!("expected bytes in body") + }; + + Ok(()) + } +} diff --git a/graph-builder/src/graph.rs b/graph-builder/src/graph.rs index 7c04596fb..e3e9de4bb 100644 --- a/graph-builder/src/graph.rs +++ b/graph-builder/src/graph.rs @@ -17,6 +17,7 @@ use crate::registry::{self, Registry}; use actix_web::{HttpRequest, HttpResponse}; use cincinnati::plugins::prelude::*; use cincinnati::{AbstractRelease, Graph, Release, CONTENT_TYPE}; +use commons::metrics::HasRegistry; use commons::GraphError; use failure::{Error, Fallible}; use futures::Future; @@ -105,6 +106,7 @@ pub struct State { live: Arc>, ready: Arc>, plugins: &'static [BoxedPlugin], + registry: &'static prometheus::Registry, } impl State { @@ -115,6 +117,7 @@ impl State { live: Arc>, ready: Arc>, plugins: &'static [BoxedPlugin], + registry: &'static prometheus::Registry, ) -> State { State { json, @@ -122,6 +125,7 @@ impl State { live, ready, plugins, + registry, } } @@ -136,6 +140,12 @@ impl State { } } +impl HasRegistry for State { + fn registry(&self) -> &'static prometheus::Registry { + self.registry + } +} + #[allow(clippy::useless_let_if_seq)] pub fn run<'a>(settings: &'a config::AppSettings, state: &State) -> ! { // Grow-only cache, mapping tag (hashed layers) to optional release metadata. diff --git a/graph-builder/src/main.rs b/graph-builder/src/main.rs index 25a566f9f..79539e629 100644 --- a/graph-builder/src/main.rs +++ b/graph-builder/src/main.rs @@ -22,18 +22,26 @@ extern crate structopt; extern crate tempfile; use crate::failure::ResultExt; -use graph_builder::{config, graph, graph::RwLock, status}; - use actix_web::{App, HttpServer}; use cincinnati::plugins::prelude::*; +use commons::metrics::{self, HasRegistry}; use failure::Error; +use graph_builder::{config, graph, graph::RwLock, status}; use std::sync::Arc; use std::thread; +/// Common prefix for graph-builder metrics. +pub static METRICS_PREFIX: &str = "cincinnati_gb"; + fn main() -> Result<(), Error> { let sys = actix::System::new("graph-builder"); let settings = config::AppSettings::assemble().context("could not assemble AppSettings")?; + env_logger::Builder::from_default_env() + .filter(Some(module_path!()), settings.verbosity) + .init(); + debug!("application settings:\n{:#?}", settings); + let plugins: Vec = if settings.disable_quay_api_metadata { Default::default() } else { @@ -69,13 +77,14 @@ fn main() -> Result<(), Error> { }) ) }; + let registry: prometheus::Registry = metrics::new_registry(Some(METRICS_PREFIX.to_string()))?; - env_logger::Builder::from_default_env() - .filter(Some(module_path!()), settings.verbosity) - .init(); - debug!("application settings:\n{:#?}", settings); + let service_addr = (settings.address, settings.port); + let status_addr = (settings.status_address, settings.status_port); + let app_prefix = settings.path_prefix.clone(); - let app_state = { + // Shared state. + let state = { let json_graph = Arc::new(RwLock::new(String::new())); let live = Arc::new(RwLock::new(false)); let ready = Arc::new(RwLock::new(false)); @@ -86,20 +95,18 @@ fn main() -> Result<(), Error> { live.clone(), ready.clone(), Box::leak(Box::new(plugins)), + Box::leak(Box::new(registry)), ) }; - let service_addr = (settings.address, settings.port); - let status_addr = (settings.status_address, settings.status_port); - let app_prefix = settings.path_prefix.clone(); - // Graph scraper - let graph_state = app_state.clone(); + let graph_state = state.clone(); thread::spawn(move || graph::run(&settings, &graph_state)); // Status service. - graph::register_metrics(&status::PROM_REGISTRY)?; - let status_state = app_state.clone(); + graph::register_metrics(state.registry())?; + + let status_state = state.clone(); HttpServer::new(move || { App::new() .register_data(actix_web::web::Data::new(status_state.clone())) @@ -109,7 +116,7 @@ fn main() -> Result<(), Error> { ) .service( actix_web::web::resource("/metrics") - .route(actix_web::web::get().to(status::serve_metrics)), + .route(actix_web::web::get().to(metrics::serve::)), ) .service( actix_web::web::resource("/readiness") @@ -120,7 +127,7 @@ fn main() -> Result<(), Error> { .start(); // Main service. - let main_state = app_state.clone(); + let main_state = state.clone(); HttpServer::new(move || { App::new() .register_data(actix_web::web::Data::new(main_state.clone())) @@ -136,3 +143,68 @@ fn main() -> Result<(), Error> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::graph::State; + use actix_web::test::TestRequest; + use commons::metrics::HasRegistry; + use commons::testing; + use failure::{bail, Fallible}; + use parking_lot::RwLock; + use prometheus::Registry; + use std::collections::HashSet; + use std::sync::Arc; + + fn mock_state() -> State { + let json_graph = Arc::new(RwLock::new(String::new())); + let live = Arc::new(RwLock::new(false)); + let ready = Arc::new(RwLock::new(false)); + + let plugins = Box::leak(Box::new([])); + let registry: &'static Registry = Box::leak(Box::new( + metrics::new_registry(Some(METRICS_PREFIX.to_string())).unwrap(), + )); + + State::new( + json_graph.clone(), + HashSet::new(), + live.clone(), + ready.clone(), + plugins, + registry, + ) + } + + #[test] + fn serve_metrics_basic() -> Fallible<()> { + let mut rt = testing::init_runtime()?; + let state = mock_state(); + + let registry = ::registry(&state); + graph::register_metrics(registry)?; + testing::dummy_gauge(registry, 42.0)?; + + let http_req = TestRequest::default().data(state).to_http_request(); + let metrics_call = metrics::serve::(http_req); + let resp = rt.block_on(metrics_call)?; + + assert_eq!(resp.status(), 200); + if let actix_web::body::ResponseBody::Body(body) = resp.body() { + if let actix_web::body::Body::Bytes(bytes) = body { + assert!(!bytes.is_empty()); + println!("{:?}", std::str::from_utf8(bytes.as_ref())); + assert!( + twoway::find_bytes(bytes.as_ref(), b"cincinnati_gb_dummy_gauge 42\n").is_some() + ); + } else { + bail!("expected Body") + } + } else { + bail!("expected bytes in body") + }; + + Ok(()) + } +} diff --git a/graph-builder/src/status.rs b/graph-builder/src/status.rs index 8d894125c..095d0669b 100644 --- a/graph-builder/src/status.rs +++ b/graph-builder/src/status.rs @@ -4,34 +4,6 @@ use crate::graph::State; use actix_web::{HttpRequest, HttpResponse}; use futures::future; use futures::prelude::*; -use prometheus; - -/// Common prefix for graph-builder metrics. -static GB_METRICS_PREFIX: &str = "cincinnati_gb"; - -lazy_static! { - /// Metrics registry. - pub static ref PROM_REGISTRY: prometheus::Registry = - prometheus::Registry::new_custom(Some(GB_METRICS_PREFIX.to_string()), None) - .expect("could not create metrics registry"); -} - -/// Expose metrics (Prometheus textual format). -pub fn serve_metrics( - _req: HttpRequest, -) -> Box> { - use prometheus::Encoder; - - let resp = future::ok(PROM_REGISTRY.gather()) - .and_then(|metrics| { - let tenc = prometheus::TextEncoder::new(); - let mut buf = vec![]; - tenc.encode(&metrics, &mut buf).and(Ok(buf)) - }) - .from_err() - .map(|content| HttpResponse::Ok().body(content)); - Box::new(resp) -} /// Expose liveness status. /// @@ -72,56 +44,3 @@ pub fn serve_readiness( }; Box::new(future::ok(resp)) } - -#[cfg(test)] -mod tests { - use crate::graph::State; - use crate::status; - use actix_web::test::TestRequest; - use commons::testing; - use failure::Fallible; - use parking_lot::RwLock; - use std::collections::HashSet; - use std::sync::Arc; - - fn mock_state() -> State { - let json_graph = Arc::new(RwLock::new(String::new())); - let live = Arc::new(RwLock::new(false)); - let ready = Arc::new(RwLock::new(false)); - - State::new( - json_graph.clone(), - HashSet::new(), - live.clone(), - ready.clone(), - Box::leak(Box::new([])), - ) - } - - #[test] - fn serve_metrics_basic() -> Fallible<()> { - let mut rt = testing::init_runtime()?; - testing::dummy_gauge(&status::PROM_REGISTRY, 42.0)?; - - let http_req = TestRequest::default().data(mock_state()).to_http_request(); - let metrics_call = status::serve_metrics(http_req); - let resp = rt.block_on(metrics_call)?; - - assert_eq!(resp.status(), 200); - if let actix_web::body::ResponseBody::Body(body) = resp.body() { - if let actix_web::body::Body::Bytes(bytes) = body { - assert!(!bytes.is_empty()); - println!("{:?}", std::str::from_utf8(bytes.as_ref())); - assert!( - twoway::find_bytes(bytes.as_ref(), b"cincinnati_gb_dummy_gauge 42\n").is_some() - ); - } else { - bail!("expected Body") - } - } else { - bail!("expected bytes in body") - }; - - Ok(()) - } -} diff --git a/policy-engine/src/main.rs b/policy-engine/src/main.rs index ac59984f8..c1218f344 100644 --- a/policy-engine/src/main.rs +++ b/policy-engine/src/main.rs @@ -32,36 +32,45 @@ extern crate tempfile; extern crate url; mod config; mod graph; -mod metrics; mod openapi; use actix_web::{App, HttpServer}; use cincinnati::plugins::BoxedPlugin; +use commons::metrics::{self, RegistryWrapper}; use failure::Error; +use prometheus::Registry; use std::collections::HashSet; +/// Common prefix for policy-engine metrics. +pub static METRICS_PREFIX: &str = "cincinnati_pe"; + fn main() -> Result<(), Error> { let sys = actix::System::new("policy-engine"); let settings = config::AppSettings::assemble()?; - let plugins = settings.policy_plugins()?; - env_logger::Builder::from_default_env() .filter(Some(module_path!()), settings.verbosity) .init(); debug!("application settings:\n{:#?}", &settings); // Metrics service. - graph::register_metrics(&metrics::PROM_REGISTRY)?; - HttpServer::new(|| { - App::new().service( - actix_web::web::resource("/metrics").route(actix_web::web::get().to(metrics::serve)), - ) + let registry: &'static Registry = Box::leak(Box::new(metrics::new_registry(Some( + METRICS_PREFIX.to_string(), + ))?)); + graph::register_metrics(registry)?; + HttpServer::new(move || { + App::new() + .register_data(actix_web::web::Data::new(RegistryWrapper(registry))) + .service( + actix_web::web::resource("/metrics") + .route(actix_web::web::get().to(metrics::serve::)), + ) }) .bind((settings.status_address, settings.status_port))? .start(); // Main service. + let plugins = settings.policy_plugins()?; let state = AppState { mandatory_params: settings.mandatory_client_parameters.clone(), upstream: settings.upstream.clone(), diff --git a/policy-engine/src/metrics.rs b/policy-engine/src/metrics.rs deleted file mode 100644 index 72461ce46..000000000 --- a/policy-engine/src/metrics.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! Metrics service. - -use actix_web::{HttpRequest, HttpResponse}; -use futures::future; -use futures::prelude::*; -use prometheus; - -/// Common prefix for policy-engine metrics. -static PE_METRICS_PREFIX: &str = "cincinnati_pe"; - -lazy_static! { - /// Metrics registry. - pub(crate) static ref PROM_REGISTRY: prometheus::Registry = - prometheus::Registry::new_custom(Some(PE_METRICS_PREFIX.to_string()), None) - .expect("could not create metrics registry"); -} - -/// Serve metrics requests (Prometheus textual format). -pub(crate) fn serve( - _req: HttpRequest, -) -> Box> { - use prometheus::Encoder; - - let resp = future::ok(PROM_REGISTRY.gather()) - .and_then(|metrics| { - let tenc = prometheus::TextEncoder::new(); - let mut buf = vec![]; - tenc.encode(&metrics, &mut buf).and(Ok(buf)) - }) - .from_err() - .map(|content| HttpResponse::Ok().body(content)); - Box::new(resp) -} - -#[cfg(test)] -mod tests { - use crate::metrics; - use actix_web::test::TestRequest; - use commons::testing; - use failure::Fallible; - - #[test] - fn serve_metrics_basic() -> Fallible<()> { - let mut rt = testing::init_runtime()?; - testing::dummy_gauge(&metrics::PROM_REGISTRY, 42.0)?; - - let http_req = TestRequest::default().to_http_request(); - let metrics_call = metrics::serve(http_req); - let resp = rt.block_on(metrics_call)?; - - assert_eq!(resp.status(), 200); - if let actix_web::body::ResponseBody::Body(body) = resp.body() { - if let actix_web::body::Body::Bytes(bytes) = body { - assert!(!bytes.is_empty()); - assert!( - twoway::find_bytes(bytes.as_ref(), b"cincinnati_pe_dummy_gauge 42\n").is_some() - ); - } else { - bail!("expected Body") - } - } else { - bail!("expected bytes in body") - }; - - Ok(()) - } -}