Skip to content

Commit

Permalink
chore: Added a data collection agent
Browse files Browse the repository at this point in the history
  • Loading branch information
isala404 committed Mar 30, 2022
1 parent 3e6e912 commit a46a587
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 15 deletions.
10 changes: 4 additions & 6 deletions control-plane/config/samples/loads.yaml

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions gazer/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ rules:
- get
- list
- watch
- apiGroups: ["apps"]
resources:
- deployments
verbs:
- get
- list
- watch
- apiGroups:
- extensions
resources:
Expand Down
2 changes: 1 addition & 1 deletion inspector/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
command: ["inspector"]
# args: ["-c", "while true; do echo hello; sleep 10;done"]
imagePullPolicy: Always
image: ghcr.io/mrsupiri/lazy-koala/inspector:commit-342ac6a8
image: ghcr.io/mrsupiri/lazy-koala/inspector:latest
ports:
- containerPort: 8090
name: http
Expand Down
74 changes: 74 additions & 0 deletions scripts/data-exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from flask import Flask, request, jsonify
from datetime import datetime
import pymongo
import os

app = Flask(__name__)


myclient = pymongo.MongoClient(os.getenv("MONGODB_URI"))
mydb = myclient["metrics"]


@app.route('/save', methods=['GET', 'POST'])
def add_message():
content = request.json
mycol = mydb[content['service']]
x = mycol.insert_one({"data": content['data'], "time": datetime.now()})
return jsonify({"success": True})

if __name__ == '__main__':
app.run(host= '0.0.0.0',debug=True)







# import grequests
# import urllib.parse
# import numpy as np

# services = [
# "service-1-18f76028",
# "service-10-18f76028",
# "service-2-18f76028",
# "service-3-18f76028",
# "service-4-18f76028",
# "service-5-18f76028",
# "service-6-18f76028",
# "service-7-18f76028",
# "service-8-18f76028",
# "service-9-18f76028",
# ]

# samples = ["1m", "5m", "15m"]

# metrics = [
# 'rate(requests_sent_total{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'sum by (serviceName) (rate(requests_received_total{serviceName="SERVICE_NAME"}[SAMPLE]))',
# 'rate(request_duration_seconds_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'avg_over_time(cpu_seconds{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'avg_over_time(memory_usage_bytes{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'rate(acknowledged_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'rate(transmitted_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
# 'avg_over_time(backlog{level="1",serviceName="SERVICE_NAME"}[SAMPLE])',
# 'sum by (serviceName) (avg_over_time(backlog{level!="1",serviceName="SERVICE_NAME"}[SAMPLE]))',
# ]

# requests = []

# def chunks(l, n):
# n = max(1, n)
# return (l[i:i+n] for i in range(0, len(l), n))

# for service in services:
# for sample in samples:
# for metric in metrics:
# query = metric.replace("SERVICE_NAME", service).replace("SAMPLE", sample)
# url = "http://127.0.0.1:9090/api/v1/query?query="+urllib.parse.quote_plus(query)
# requests.append(grequests.get(url))
# respon = grequests.map(requests)
# f
# break;
2 changes: 1 addition & 1 deletion scripts/loadTemplete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import random

services = list(range(1,10))
services = list(range(1,11))

random.shuffle(services)

Expand Down
9 changes: 9 additions & 0 deletions sherlock/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ spec:
volumeMounts:
- name: sherlock-config
mountPath: /app/config
- name: data-exporter
imagePullPolicy: Always
image: asia.gcr.io/iconicto/data-exporter:latest
ports:
- containerPort: 5000
name: http
env:
- name: MONGODB_URI
value: "http://localhost:8501/v1/models"
- image: google/cloud-sdk
name: model-poller
imagePullPolicy: IfNotPresent
Expand Down
19 changes: 18 additions & 1 deletion sherlock/src/inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,27 @@ async fn query_model(service: &str, input: [[[f64; 1]; 9]; 10]) -> Result<f64, B
Ok(mse)
}

async fn save(service: &str, input: [[[f64; 3]; 9]; 10]) -> Result<(), Box<dyn std::error::Error>> {
let query = json!({
"service": service,
"data": input,
});

let client = reqwest::Client::new();
let _res = client.post("http://localhost:5000/save")
.json::<serde_json::Value>(&query)
.send()
.await?;

Ok(())
}

async fn calculate_anomaly_score(service: &str, args: &InferenceData) -> Result<(), Box<dyn std::error::Error>> {
println!("Calculate anomaly score for {} using {}", service, &args.model_name);
let input = build_telemetry_matrix(&service).await?;
let score = query_model(&args.model_name, input).await?;
save(&service, input).await?;
// let score = query_model(&args.model_name, input).await?;
let score = 0.4;
ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(score);
println!("Anomaly score for {}: {}", service, score);
Ok(())
Expand Down
20 changes: 14 additions & 6 deletions sherlock/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ lazy_static! {
r#"avg_over_time(backlog{level="1",serviceName="SERVICE_NAME"}[SAMPLE])"#,
r#"sum by (serviceName) (avg_over_time(backlog{level!="1",serviceName="SERVICE_NAME"}[SAMPLE]))"#
];

static ref SAMPLES: [&'static str; 3] = [
"1m",
"5m",
"15m"
];
}


Expand All @@ -31,8 +37,8 @@ async fn query_prometheus(query: &str, time: DateTime<Local>) -> Result<f64, Bo
Ok(value)
}

pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 1]; 9]; 10], Box<dyn std::error::Error>> {
let mut data: [[[f64; 1]; 9]; 10] = [[[0.0; 1]; 9]; 10];
pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 3]; 9]; 10], Box<dyn std::error::Error>> {
let mut data: [[[f64; 3]; 9]; 10] = [[[0.0; 3]; 9]; 10];

let time_steps: [DateTime<Local>; 10] = [
Local::now(),
Expand All @@ -49,10 +55,12 @@ pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 1]; 9]; 10]

for (x, time_step) in time_steps.iter().enumerate() {
for (y, metric) in METRICS.iter().enumerate() {
let query = &metric.replace("SERVICE_NAME", service).replace("SAMPLE", "1m");
match query_prometheus(query, *time_step).await {
Ok(value) => data[x][y][0] = value,
Err(e) => return Err(e),
for (z, sample) in SAMPLES.iter().enumerate(){
let query = &metric.replace("SERVICE_NAME", service).replace("SAMPLE", sample);
match query_prometheus(query, *time_step).await {
Ok(value) => data[x][y][z] = value,
Err(e) => return Err(e),
}
}
}
}
Expand Down

0 comments on commit a46a587

Please sign in to comment.