Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ serde = "^1.0.70"
serde_json = "^1.0.34"
tokio = "^0.1"
url = "^1.7.2"
futures = "^0.1"

[dev-dependencies]
tokio = "^0.1"
twoway = "^0.2"
mockito = "^0.17.1"
6 changes: 3 additions & 3 deletions commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ mod config;
pub use crate::config::MergeOptions;

pub mod de;
pub mod metrics;
pub mod testing;

mod errors;
pub use crate::errors::{register_metrics, GraphError, MISSING_APPSTATE_PANIC_MSG};

pub mod testing;
pub use errors::{register_metrics, GraphError, MISSING_APPSTATE_PANIC_MSG};

use actix_web::http::header;
use std::collections::HashSet;
Expand Down
104 changes: 104 additions & 0 deletions commons/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Metrics service.

use actix_web::{HttpRequest, HttpResponse};
use failure::Fallible;
use futures::future;
use futures::prelude::*;
use prometheus::{self, Registry};

/// For types that store a static Registry reference
pub trait HasRegistry {
/// Get the static registry reference
fn registry(&self) -> &'static Registry;
}

/// Minimally wraps a Registry for implementing `HasRegistry`.
pub struct RegistryWrapper(pub &'static Registry);

impl HasRegistry for RegistryWrapper {
fn registry(&self) -> &'static Registry {
self.0
}
}

/// Serve metrics requests (Prometheus textual format).
pub fn serve<T>(req: HttpRequest) -> Box<dyn Future<Item = HttpResponse, Error = failure::Error>>
where
T: 'static + HasRegistry,
{
use prometheus::Encoder;

let registry: &Registry = match req.app_data::<T>() {
Some(t) => t.registry(),
None => {
return Box::new(futures::future::err(failure::err_msg(
"could not get registry from app_data",
)))
}
};

let resp = future::ok(registry.gather())
.and_then(|metrics| {
let tenc = prometheus::TextEncoder::new();
let mut buf = vec![];
tenc.encode(&metrics, &mut buf).and(Ok(buf))
})
.from_err()
.map(|content| HttpResponse::Ok().body(content));
Box::new(resp)
}

/// Create a custom Prometheus registry.
pub fn new_registry(prefix: Option<String>) -> Fallible<Registry> {
Registry::new_custom(prefix.clone(), None).map_err(|e| {
failure::err_msg(format!(
"could not create a custom regostry with prefix {:?}: {}",
prefix,
e.to_string()
))
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::testing;
use actix_web::test::TestRequest;

#[test]
fn serve_metrics_basic() -> Fallible<()> {
let mut rt = testing::init_runtime()?;

let metrics_prefix = "cincinnati";
let registry_wrapped = RegistryWrapper(Box::leak(Box::new(new_registry(Some(
metrics_prefix.to_string(),
))?)));

testing::dummy_gauge(&registry_wrapped.0, 42.0)?;

let http_req = TestRequest::default()
.data(registry_wrapped)
.to_http_request();

let metrics_call = serve::<RegistryWrapper>(http_req);
let resp = rt.block_on(metrics_call)?;

assert_eq!(resp.status(), 200);
if let actix_web::body::ResponseBody::Body(body) = resp.body() {
if let actix_web::body::Body::Bytes(bytes) = body {
assert!(!bytes.is_empty());
assert!(twoway::find_bytes(
bytes.as_ref(),
format!("{}_dummy_gauge 42\n", metrics_prefix).as_bytes()
)
.is_some());
} else {
bail!("expected Body")
}
} else {
bail!("expected bytes in body")
};

Ok(())
}
}
10 changes: 10 additions & 0 deletions graph-builder/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::registry::{self, Registry};
use actix_web::{HttpRequest, HttpResponse};
use cincinnati::plugins::prelude::*;
use cincinnati::{AbstractRelease, Graph, Release, CONTENT_TYPE};
use commons::metrics::HasRegistry;
use commons::GraphError;
use failure::{Error, Fallible};
use futures::Future;
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct State {
live: Arc<RwLock<bool>>,
ready: Arc<RwLock<bool>>,
plugins: &'static [BoxedPlugin],
registry: &'static prometheus::Registry,
}

impl State {
Expand All @@ -115,13 +117,15 @@ impl State {
live: Arc<RwLock<bool>>,
ready: Arc<RwLock<bool>>,
plugins: &'static [BoxedPlugin],
registry: &'static prometheus::Registry,
) -> State {
State {
json,
mandatory_params,
live,
ready,
plugins,
registry,
}
}

Expand All @@ -136,6 +140,12 @@ impl State {
}
}

impl HasRegistry for State {
fn registry(&self) -> &'static prometheus::Registry {
self.registry
}
}

#[allow(clippy::useless_let_if_seq)]
pub fn run<'a>(settings: &'a config::AppSettings, state: &State) -> ! {
// Grow-only cache, mapping tag (hashed layers) to optional release metadata.
Expand Down
104 changes: 88 additions & 16 deletions graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@ extern crate structopt;
extern crate tempfile;

use crate::failure::ResultExt;
use graph_builder::{config, graph, graph::RwLock, status};

use actix_web::{App, HttpServer};
use cincinnati::plugins::prelude::*;
use commons::metrics::{self, HasRegistry};
use failure::Error;
use graph_builder::{config, graph, graph::RwLock, status};
use std::sync::Arc;
use std::thread;

/// Common prefix for graph-builder metrics.
pub static METRICS_PREFIX: &str = "cincinnati_gb";

fn main() -> Result<(), Error> {
let sys = actix::System::new("graph-builder");

let settings = config::AppSettings::assemble().context("could not assemble AppSettings")?;
env_logger::Builder::from_default_env()
.filter(Some(module_path!()), settings.verbosity)
.init();
debug!("application settings:\n{:#?}", settings);

let plugins: Vec<BoxedPlugin> = if settings.disable_quay_api_metadata {
Default::default()
} else {
Expand Down Expand Up @@ -69,13 +77,14 @@ fn main() -> Result<(), Error> {
})
)
};
let registry: prometheus::Registry = metrics::new_registry(Some(METRICS_PREFIX.to_string()))?;

env_logger::Builder::from_default_env()
.filter(Some(module_path!()), settings.verbosity)
.init();
debug!("application settings:\n{:#?}", settings);
let service_addr = (settings.address, settings.port);
let status_addr = (settings.status_address, settings.status_port);
let app_prefix = settings.path_prefix.clone();

let app_state = {
// Shared state.
let state = {
let json_graph = Arc::new(RwLock::new(String::new()));
let live = Arc::new(RwLock::new(false));
let ready = Arc::new(RwLock::new(false));
Expand All @@ -86,20 +95,18 @@ fn main() -> Result<(), Error> {
live.clone(),
ready.clone(),
Box::leak(Box::new(plugins)),
Box::leak(Box::new(registry)),
)
};

let service_addr = (settings.address, settings.port);
let status_addr = (settings.status_address, settings.status_port);
let app_prefix = settings.path_prefix.clone();

// Graph scraper
let graph_state = app_state.clone();
let graph_state = state.clone();
thread::spawn(move || graph::run(&settings, &graph_state));

// Status service.
graph::register_metrics(&status::PROM_REGISTRY)?;
let status_state = app_state.clone();
graph::register_metrics(state.registry())?;

let status_state = state.clone();
HttpServer::new(move || {
App::new()
.register_data(actix_web::web::Data::new(status_state.clone()))
Expand All @@ -109,7 +116,7 @@ fn main() -> Result<(), Error> {
)
.service(
actix_web::web::resource("/metrics")
.route(actix_web::web::get().to(status::serve_metrics)),
.route(actix_web::web::get().to(metrics::serve::<graph::State>)),
)
.service(
actix_web::web::resource("/readiness")
Expand All @@ -120,7 +127,7 @@ fn main() -> Result<(), Error> {
.start();

// Main service.
let main_state = app_state.clone();
let main_state = state.clone();
HttpServer::new(move || {
App::new()
.register_data(actix_web::web::Data::new(main_state.clone()))
Expand All @@ -136,3 +143,68 @@ fn main() -> Result<(), Error> {

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::graph::State;
use actix_web::test::TestRequest;
use commons::metrics::HasRegistry;
use commons::testing;
use failure::{bail, Fallible};
use parking_lot::RwLock;
use prometheus::Registry;
use std::collections::HashSet;
use std::sync::Arc;

fn mock_state() -> State {
let json_graph = Arc::new(RwLock::new(String::new()));
let live = Arc::new(RwLock::new(false));
let ready = Arc::new(RwLock::new(false));

let plugins = Box::leak(Box::new([]));
let registry: &'static Registry = Box::leak(Box::new(
metrics::new_registry(Some(METRICS_PREFIX.to_string())).unwrap(),
));

State::new(
json_graph.clone(),
HashSet::new(),
live.clone(),
ready.clone(),
plugins,
registry,
)
}

#[test]
fn serve_metrics_basic() -> Fallible<()> {
let mut rt = testing::init_runtime()?;
let state = mock_state();

let registry = <dyn HasRegistry>::registry(&state);
graph::register_metrics(registry)?;
testing::dummy_gauge(registry, 42.0)?;

let http_req = TestRequest::default().data(state).to_http_request();
let metrics_call = metrics::serve::<graph::State>(http_req);
let resp = rt.block_on(metrics_call)?;

assert_eq!(resp.status(), 200);
if let actix_web::body::ResponseBody::Body(body) = resp.body() {
if let actix_web::body::Body::Bytes(bytes) = body {
assert!(!bytes.is_empty());
println!("{:?}", std::str::from_utf8(bytes.as_ref()));
assert!(
twoway::find_bytes(bytes.as_ref(), b"cincinnati_gb_dummy_gauge 42\n").is_some()
);
} else {
bail!("expected Body")
}
} else {
bail!("expected bytes in body")
};

Ok(())
}
}
Loading