Skip to content

Commit

Permalink
refactor(sherlock): Moved to single use tf-serving architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
isala404 committed Mar 11, 2022
1 parent b0d1c92 commit 432a9df
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 52 deletions.
4 changes: 2 additions & 2 deletions sherlock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ lazy_static = "1.4.0"
hyper = { version = "0.14.17", features = ["full"] }
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
flate2 = "1.0.22"
tar = "0.4.38"
serde = "1.0"
serde_yaml = "0.8"
12 changes: 5 additions & 7 deletions sherlock/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ RUN cargo build --target x86_64-unknown-linux-musl --release
# Production Image
FROM alpine:latest
RUN apk --no-cache add ca-certificates

WORKDIR /app
# COPY the binary
COPY --from=builder /sherlock/target/x86_64-unknown-linux-musl/release/sherlock /sherlock
RUN mkdir -p /app/models
COPY --from=builder /sherlock/target/x86_64-unknown-linux-musl/release/sherlock /app/sherlock

# DEFAULT ENV
ENV SERVICE_NAME=sample_service
ENV NAMESPACE=default
ENV END_POINT=http://localhost:8501/v1/models/sherlock:predict
ENV END_POINT=http://localhost:8501/v1/models
ENV POOL_DURATION=60
ENV MODEL_URL=https://static.isala.me/lazy-koala/sherlock/models/sample_service.tar.gz
# Define port
EXPOSE 9898

CMD ["/sherlock"]
CMD ["/app/sherlock"]
101 changes: 101 additions & 0 deletions sherlock/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: sherlock
namespace: lazy-koala
spec:
selector:
matchLabels:
name: sherlock
template:
metadata:
labels:
name: sherlock
annotations:
lazy-koala/scrape: "true"
spec:
serviceAccountName: gke-workload-identity
containers:
- name: sherlock
image: ghcr.io/mrsupiri/lazy-koala/sherlock:latest
ports:
- containerPort: 9898
name: metrics
env:
- name: END_POINT
value: "http://localhost:8501/v1/models/sherlock:predict"
- name: POOL_DURATION
value: "1"
volumeMounts:
- name: model-config
mountPath: /app/config
- image: google/cloud-sdk
name: model-poller
imagePullPolicy: IfNotPresent
env:
- name: BUCKET_NAME
value: sherlock.isala.me
command:
- /bin/bash
- -ce
- |-
mkdir -p models
while true; do gsutil -m rsync -d -r gs://$BUCKET_NAME models; sleep 60; done
volumeMounts:
- name: model-files
mountPath: /models
- name: serving
image: tensorflow/serving
imagePullPolicy: IfNotPresent
args:
[
"--model_config_file_poll_wait_seconds=60",
"--model_config_file=/config/models.config",
]
ports:
- containerPort: 8501
name: http
env:
- name: MODEL_NAME
value: "sherlock"
volumeMounts:
- name: model-config
mountPath: /config
- name: model-files
mountPath: /models
volumes:
- name: sherlock-config
configMap:
name: sherlock-config
- name: model-files
emptyDir: {}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sherlock-config
namespace: lazy-koala
data:
models.config: |
model_config_list {
config {
name: 'service_1'
base_path: '/models/service_1/'
model_platform: 'tensorflow'
}
config {
name: 'service_2'
base_path: '/models/service_2/'
model_platform: 'tensorflow'
}
}
services.yaml: |
# ---
## https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity
# apiVersion: v1
# kind: ServiceAccount
# metadata:
# annotations:
# iam.gke.io/gcp-service-account: [email protected]
# name: gke-workload-identity
# namespace: lazy-koala
75 changes: 32 additions & 43 deletions sherlock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,26 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use prometheus::{Encoder, Gauge, TextEncoder};
use prometheus::{Encoder, GaugeVec, TextEncoder};
use std::{env::var, thread, time::Duration};
use lazy_static::lazy_static;
use prometheus::{labels, opts, register_gauge};
use prometheus::{opts, register_gauge_vec};
use std::collections::HashMap;
use flate2::read::GzDecoder;
use tar::Archive;
use serde_yaml;

lazy_static! {
static ref SERVICE_NAME: String = var("SERVICE_NAME").unwrap();
static ref NAMESPACE: String = var("NAMESPACE").unwrap();
static ref END_POINT: String = var("END_POINT").unwrap();
static ref POOL_DURATION: String = var("POOL_DURATION").unwrap();
static ref MODEL_URL: String = var("MODEL_URL").unwrap();
static ref ANOMLAY_GAUGE: Gauge = register_gauge!(opts!(
"anomaly_score",
"Reconstruction loss of the autoencoder",
labels! {"serviceName" => SERVICE_NAME.as_str(), "namespace" => NAMESPACE.as_str()}
))
static ref ANOMLAY_GAUGE: GaugeVec = register_gauge_vec!(
opts!(
"anomaly_score",
"Reconstruction loss of the autoencoder"
),
&["serviceName"]
)
.unwrap();
}

fn download_model()->Result<(), Box<dyn std::error::Error>>{
let resp = reqwest::blocking::get(MODEL_URL.as_str())?;
let tarfile = GzDecoder::new(resp);
let mut archive = Archive::new(tarfile);
archive.unpack("models/")?;

Ok(())
}

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let encoder = TextEncoder::new();
Expand All @@ -51,42 +41,41 @@ async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error>
}


fn query_model() ->Result<f64, Box<dyn std::error::Error>> {
let resp = reqwest::blocking::get(END_POINT.as_str())?
.json::<HashMap<String, f64>>()?;
fn query_model(service: &String) ->Result<f64, Box<dyn std::error::Error>> {
println!("Querying {} model", service);
let endpoint = format!("{}/{}:predict", END_POINT.as_str(), service);
let resp = reqwest::blocking::get(endpoint)?.json::<HashMap<String, f64>>()?;
Ok(resp["predictions"])
}

fn poll_anomaly_scores(delay: u64) {

loop {

if let Ok(value) = query_model(){
ANOMLAY_GAUGE.set(value);
}

match read_config() {
Ok(services) => {
for service in services.iter(){
match query_model(service) {
Ok(score) => ANOMLAY_GAUGE.with_label_values(&[service]).set(score),
Err(e) => eprintln!("Error while querying model: {}", e),
}
}
},
Err(e) => eprintln!("Error while parsing config: {}", e),
};
thread::sleep(Duration::from_secs(delay));
}
}

#[allow(unused_must_use)]
fn read_config() -> Result<Vec<String>, Box<dyn std::error::Error>> {
let f = std::fs::File::open("config/config.yaml")?;
let services: Vec<String> = serde_yaml::from_reader(f)?;
Ok(services)
}

#[tokio::main]
async fn main() {

// Forgive me father for i have sinned 🙏
// I couldn't figure out way to use reqwest's
// async response with GzDecoder 😭
thread::spawn(|| {
if let Err(err) = download_model() {
eprintln!("failed to download the model: {}", err);
std::process::exit(1);
}
}).join();


thread::spawn(|| poll_anomaly_scores(POOL_DURATION.as_str().parse::<u64>().unwrap()));

ANOMLAY_GAUGE.set(0.0);

let addr = ([0, 0, 0, 0], 9898).into();
println!("Listening on http://{}", addr);

Expand Down

0 comments on commit 432a9df

Please sign in to comment.