Skip to content

Commit

Permalink
feat(sherlock): Created model inference pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
isala404 committed Mar 15, 2022
1 parent 3f5f223 commit 9154586
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 62 deletions.
7 changes: 6 additions & 1 deletion sherlock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ hyper = { version = "0.14.17", features = ["full"] }
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
serde_yaml = "0.8"
serde_json = "1.0"
prometheus-http-query = "0.4.0"
chrono = "0.4.19"
smartcore = "0.2.0"
slice-of-array = "0.3.1"
1 change: 1 addition & 0 deletions sherlock/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ COPY --from=builder /sherlock/target/x86_64-unknown-linux-musl/release/sherlock
# DEFAULT ENV
ENV END_POINT=http://localhost:8501/v1/models
ENV POOL_DURATION=60
ENV PROMETHEUS_END_POINT=http://127.0.0.1:9090
# Define port
EXPOSE 9898

Expand Down
88 changes: 88 additions & 0 deletions sherlock/src/inference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{thread, time::Duration, collections::HashMap, env::var};
use serde::{Serialize, Deserialize};
use serde_json::json;
use prometheus::GaugeVec;
use lazy_static::lazy_static;
use prometheus::{opts, register_gauge_vec};
use smartcore::metrics::mean_absolute_error::MeanAbsoluteError;
use crate::query::build_telemetry_matrix;
use ::slice_of_array::prelude::*;


lazy_static! {
static ref TENSORFLOW_END_POINT: String = var("TENSORFLOW_END_POINT").unwrap_or("http://localhost:8501/v1/models".to_string());
static ref POOL_DURATION: String = var("POOL_DURATION").unwrap_or("60".to_string());
static ref ANOMLAY_GAUGE: GaugeVec = register_gauge_vec!(
opts!(
"anomaly_score",
"Reconstruction loss of the autoencoder"
),
&["serviceName", "namespace"]
)
.unwrap();
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct InferenceData {
model_name: String,
namespace: String,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModelResponse {
pub predictions: Vec<Vec<Vec<Vec<f64>>>>,
}


fn parse_config() -> Result<HashMap<String, InferenceData>, Box<dyn std::error::Error>> {
let f = std::fs::File::open("config/services.yaml")?;
let services: HashMap<String, InferenceData> = serde_yaml::from_reader(f)?;
Ok(services)
}

async fn query_model(service: &str, input: [[[f64; 1]; 9]; 10]) -> Result<f64, Box<dyn std::error::Error>> {
let endpoint = format!("{}/{}:predict", TENSORFLOW_END_POINT.as_str(), service);

let query = json!({
"instances": [input],
});

let client = reqwest::Client::new();
let res = client.post(endpoint)
.json::<serde_json::Value>(&query)
.send()
.await?;

let predictions = res.json::<ModelResponse>().await?.predictions.into_iter().flatten().flatten().flatten().collect::<Vec<f64>>();

let mse: f64 = MeanAbsoluteError{}.get_score(&input.flat().flat().to_vec(), &predictions);

Ok(mse)
}

async fn calculate_anomaly_score(service: &str, args: &InferenceData) -> Result<(), Box<dyn std::error::Error>> {
println!("Querying {} model", service);
let input = build_telemetry_matrix(&service).await?;
let score = query_model(&args.model_name, input).await?;
ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(score);
println!("Anomaly score for {}: {}", service, score);
Ok(())
}

pub async fn poll_anomaly_scores() {
let delay = POOL_DURATION.as_str().parse::<u64>().unwrap();

loop {
let services = parse_config().unwrap_or_default();

for (service, args) in services.iter() {
if let Err(err) = calculate_anomaly_score(service, args).await {
eprintln!("Error while calculating anomaly score: {}", err);
ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(-1.0)
}
}
thread::sleep(Duration::from_secs(delay));
}
}
70 changes: 9 additions & 61 deletions sherlock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,16 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use prometheus::{Encoder, GaugeVec, TextEncoder};
use std::{env::var, thread, time::Duration, collections::HashMap};
use lazy_static::lazy_static;
use prometheus::{opts, register_gauge_vec};
use serde::{Serialize, Deserialize};
use prometheus::{Encoder, TextEncoder};
use crate::inference::poll_anomaly_scores;

lazy_static! {
static ref END_POINT: String = var("END_POINT").unwrap_or("http://localhost:8501/v1/models".to_string());
static ref POOL_DURATION: String = var("POOL_DURATION").unwrap_or("60".to_string());
static ref ANOMLAY_GAUGE: GaugeVec = register_gauge_vec!(
opts!(
"anomaly_score",
"Reconstruction loss of the autoencoder"
),
&["serviceName", "namespace"]
)
.unwrap();
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct InferenceData {
model_name : String,
namespace: String,
}
mod inference;
mod query;


async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn metrics_endpoint(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let encoder = TextEncoder::new();

let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
Expand All @@ -47,49 +27,17 @@ async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error>
}


fn query_model(service: &String) ->Result<f64, Box<dyn std::error::Error>> {
println!("Querying {} model", service);
let endpoint = format!("{}/{}:predict", END_POINT.as_str(), service);
let resp = reqwest::blocking::get(endpoint)?.json::<HashMap<String, f64>>()?;
Ok(resp["predictions"])
}

fn poll_anomaly_scores(delay: u64) {
loop {
match read_config() {
Ok(services) => {
for (service, args) in services.iter() {
match query_model(&args.model_name) {
Ok(score) => ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(score),
Err(e) => {
eprintln!("Error while querying model: {}", e);
ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(-1.0)
},
}
}
},
Err(e) => eprintln!("Error while parsing config: {}", e),
};
thread::sleep(Duration::from_secs(delay));
}
}

fn read_config() -> Result<HashMap<String, InferenceData>, Box<dyn std::error::Error>> {
let f = std::fs::File::open("config/services.yaml")?;
let services: HashMap<String, InferenceData> = serde_yaml::from_reader(f)?;
Ok(services)
}

#[tokio::main]
async fn main() {

thread::spawn(|| poll_anomaly_scores(POOL_DURATION.as_str().parse::<u64>().unwrap()));
tokio::spawn(async { poll_anomaly_scores().await });


let addr = ([0, 0, 0, 0], 9898).into();
println!("Listening on http://{}", addr);

let serve_future = Server::bind(&addr).serve(make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(serve_req))
Ok::<_, hyper::Error>(service_fn(metrics_endpoint))
}));

if let Err(err) = serve_future.await {
Expand Down
61 changes: 61 additions & 0 deletions sherlock/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::env::var;
use prometheus_http_query::{Client, RangeVector};
use chrono::{Duration, Local, DateTime};
use lazy_static::lazy_static;

lazy_static! {
static ref CLIENT: Client = Client::try_from(
var("PROMETHEUS_END_POINT").unwrap_or("http://127.0.0.1:9090".to_string())
).unwrap();
static ref METRICS: [&'static str; 9] = [
r#"rate(requests_sent_total{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"sum by (serviceName) (rate(requests_received_total{serviceName="SERVICE_NAME"}[SAMPLE]))"#,
r#"rate(request_duration_seconds_sum{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"avg_over_time(cpu_seconds{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"avg_over_time(memory_usage_bytes{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"rate(acknowledged_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"rate(transmitted_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"avg_over_time(backlog{level="1",serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"sum by (serviceName) (avg_over_time(backlog{level!="1",serviceName="SERVICE_NAME"}[SAMPLE]))"#
];
}


async fn query_prometheus(query: &str, time: DateTime<Local>) -> Result<f64, Box<dyn std::error::Error>> {
let v = RangeVector(query.to_string());

let response = CLIENT.query(v, Some(time.timestamp()), None).await?;

let value = response.as_instant().ok_or("query was empty")?.get(0).ok_or("metric not found")?.sample().value();

Ok(value)
}

pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 1]; 9]; 10], Box<dyn std::error::Error>> {
let mut data: [[[f64; 1]; 9]; 10] = [[[0.0; 1]; 9]; 10];

let time_steps: [DateTime<Local>; 10] = [
Local::now(),
Local::now() - Duration::minutes(1),
Local::now() - Duration::minutes(2),
Local::now() - Duration::minutes(4),
Local::now() - Duration::minutes(8),
Local::now() - Duration::minutes(16),
Local::now() - Duration::minutes(32),
Local::now() - Duration::minutes(64),
Local::now() - Duration::minutes(128),
Local::now() - Duration::minutes(256),
];

for (x, time_step) in time_steps.iter().enumerate() {
for (y, metric) in METRICS.iter().enumerate() {
let query = &metric.replace("SERVICE_NAME", service).replace("SAMPLE", "1m");
match query_prometheus(query, *time_step).await {
Ok(value) => data[x][y][0] = value,
Err(e) => return Err(e),
}
}
}

Ok(data)
}

0 comments on commit 9154586

Please sign in to comment.