Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,15 @@ async fn update_alert_state(
trace!("ALERT!!!!!!");
let message = format_alert_message(agg_results);
ALERTS
.update_state(&alert.id.to_string(), AlertState::Triggered, Some(message))
.update_state(alert.id, AlertState::Triggered, Some(message))
.await
} else if ALERTS
.get_state(&alert.id)
.await?
.eq(&AlertState::Triggered)
{
} else if ALERTS.get_state(alert.id).await?.eq(&AlertState::Triggered) {
ALERTS
.update_state(&alert.id.to_string(), AlertState::Resolved, Some("".into()))
.update_state(alert.id, AlertState::Resolved, Some("".into()))
.await
} else {
ALERTS
.update_state(&alert.id.to_string(), AlertState::Resolved, None)
.update_state(alert.id, AlertState::Resolved, None)
.await
}
}
Expand Down
168 changes: 50 additions & 118 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/

use actix_web::http::header::ContentType;
use actix_web::web::Json;
use actix_web::{FromRequest, HttpRequest};
use alerts_utils::user_auth_for_query;
use async_trait::async_trait;
use chrono::Utc;
Expand All @@ -29,12 +27,11 @@ use once_cell::sync::Lazy;
use serde_json::Error as SerdeError;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use std::future::Future;
use std::pin::Pin;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{trace, warn};
use ulid::Ulid;

pub mod alerts_utils;
pub mod target;
Expand All @@ -46,22 +43,20 @@ use crate::storage;
use crate::storage::ObjectStorageError;
use crate::sync::schedule_alert_task;
use crate::utils::time::TimeRange;
use crate::utils::{get_hash, uid};

use self::target::Target;

// these types describe the scheduled task for an alert
pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
pub type ScheduledTasks = RwLock<HashMap<String, ScheduledTaskHandlers>>;

pub const CURRENT_ALERTS_VERSION: &str = "v1";

pub static ALERTS: Lazy<Alerts> = Lazy::new(Alerts::default);

#[derive(Debug, Default)]
pub struct Alerts {
pub alerts: RwLock<Vec<AlertConfig>>,
pub scheduled_tasks: ScheduledTasks,
pub alerts: RwLock<HashMap<Ulid, AlertConfig>>,
pub scheduled_tasks: RwLock<HashMap<Ulid, ScheduledTaskHandlers>>,
}

#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -132,7 +127,7 @@ impl Context {

#[derive(Debug, Clone)]
pub struct AlertInfo {
alert_id: String,
alert_id: Ulid,
alert_name: String,
// message: String,
// reason: String,
Expand All @@ -142,7 +137,7 @@ pub struct AlertInfo {

impl AlertInfo {
pub fn new(
alert_id: String,
alert_id: Ulid,
alert_name: String,
alert_state: AlertState,
severity: String,
Expand All @@ -159,16 +154,12 @@ impl AlertInfo {
#[derive(Debug, Clone)]
pub struct DeploymentInfo {
deployment_instance: String,
deployment_id: uid::Uid,
deployment_id: Ulid,
deployment_mode: String,
}

impl DeploymentInfo {
pub fn new(
deployment_instance: String,
deployment_id: uid::Uid,
deployment_mode: String,
) -> Self {
pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self {
Self {
deployment_instance,
deployment_id,
Expand Down Expand Up @@ -405,43 +396,11 @@ pub struct AlertRequest {
pub targets: Vec<Target>,
}

impl FromRequest for AlertRequest {
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;

fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future {
let body = Json::<AlertRequest>::from_request(req, payload);
let fut = async move {
let body = body.await?.into_inner();
Ok(body)
};

Box::pin(fut)
}
}

impl AlertRequest {
pub fn modify(self, alert: AlertConfig) -> AlertConfig {
AlertConfig {
version: alert.version,
id: alert.id,
severity: alert.severity,
title: self.title,
query: self.query,
alert_type: self.alert_type,
aggregate_config: self.aggregate_config,
eval_type: self.eval_type,
targets: self.targets,
state: AlertState::default(),
}
}
}

impl From<AlertRequest> for AlertConfig {
fn from(val: AlertRequest) -> AlertConfig {
AlertConfig {
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
id: Ulid::new(),
severity: val.severity,
title: val.title,
query: val.query,
Expand All @@ -458,7 +417,8 @@ impl From<AlertRequest> for AlertConfig {
#[serde(rename_all = "camelCase")]
pub struct AlertConfig {
pub version: AlertVerison,
pub id: String,
#[serde(default)]
pub id: Ulid,
pub severity: Severity,
pub title: String,
pub query: String,
Expand All @@ -467,11 +427,21 @@ pub struct AlertConfig {
pub eval_type: EvalConfig,
pub targets: Vec<Target>,
// for new alerts, state should be resolved
#[serde(default = "AlertState::default")]
#[serde(default)]
pub state: AlertState,
}

impl AlertConfig {
pub fn modify(&mut self, alert: AlertRequest) {
self.title = alert.title;
self.query = alert.query;
self.alert_type = alert.alert_type;
self.aggregate_config = alert.aggregate_config;
self.eval_type = alert.eval_type;
self.targets = alert.targets;
self.state = AlertState::default();
}

/// Validations
pub async fn validate(&self) -> Result<(), AlertError> {
// validate evalType
Expand Down Expand Up @@ -694,7 +664,7 @@ impl AlertConfig {

Context::new(
AlertInfo::new(
self.id.to_string(),
self.id,
self.title.clone(),
self.state,
self.severity.clone().to_string(),
Expand Down Expand Up @@ -757,33 +727,20 @@ impl actix_web::ResponseError for AlertError {
}

impl Alerts {
/// Loads alerts from disk
/// spawn scheduled tasks
/// Evaluate
/// Loads alerts from disk, blocks
pub async fn load(&self) -> Result<(), AlertError> {
let mut this = vec![];
let mut map = self.alerts.write().await;
let store = CONFIG.storage().get_object_store();
let all_alerts = store.get_alerts().await.unwrap_or_default();

for alert in all_alerts {
if alert.is_empty() {
continue;
}

let alert: AlertConfig = serde_json::from_slice(&alert)?;

for alert in store.get_alerts().await.unwrap_or_default() {
let (handle, rx, tx) =
schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;

self.update_task(&alert.id, handle, rx, tx).await;
self.update_task(alert.id, handle, rx, tx).await;

this.push(alert);
map.insert(alert.id, alert);
}

let mut s = self.alerts.write().await;
s.append(&mut this.clone());
drop(s);

Ok(())
}

Expand All @@ -793,7 +750,7 @@ impl Alerts {
session: SessionKey,
) -> Result<Vec<AlertConfig>, AlertError> {
let mut alerts: Vec<AlertConfig> = Vec::new();
for alert in self.alerts.read().await.iter() {
for (_, alert) in self.alerts.read().await.iter() {
// filter based on whether the user can execute this query or not
let query = &alert.query;
if user_auth_for_query(&session, query).await.is_ok() {
Expand All @@ -805,11 +762,9 @@ impl Alerts {
}

/// Returns a sigle alert that the user has access to (based on query auth)
pub async fn get_alert_by_id(&self, id: &str) -> Result<AlertConfig, AlertError> {
pub async fn get_alert_by_id(&self, id: Ulid) -> Result<AlertConfig, AlertError> {
let read_access = self.alerts.read().await;
let alert = read_access.iter().find(|a| a.id == id);

if let Some(alert) = alert {
if let Some(alert) = read_access.get(&id) {
Ok(alert.clone())
} else {
Err(AlertError::CustomError(format!(
Expand All @@ -820,15 +775,13 @@ impl Alerts {

/// Update the in-mem vector of alerts
pub async fn update(&self, alert: &AlertConfig) {
let mut s = self.alerts.write().await;
s.retain(|a| a.id != alert.id);
s.push(alert.clone());
self.alerts.write().await.insert(alert.id, alert.clone());
}

/// Update the state of alert
pub async fn update_state(
&self,
alert_id: &str,
alert_id: Ulid,
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError> {
Expand All @@ -847,8 +800,7 @@ impl Alerts {

// modify in memory
let mut writer = self.alerts.write().await;
let alert_to_update = writer.iter_mut().find(|alert| alert.id == alert_id);
if let Some(alert) = alert_to_update {
if let Some(alert) = writer.get_mut(&alert_id) {
trace!("in memory alert-\n{}", alert.state);
alert.state = new_state;
trace!("in memory updated alert-\n{}", alert.state);
Expand All @@ -864,20 +816,8 @@ impl Alerts {
}

/// Remove alert and scheduled task from disk and memory
pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> {
// delete from memory
let read_access = self.alerts.read().await;

let index = read_access
.iter()
.enumerate()
.find(|(_, alert)| alert.id == alert_id)
.to_owned();

if let Some((index, _)) = index {
// drop the read access in order to get exclusive write access
drop(read_access);
self.alerts.write().await.remove(index);
pub async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError> {
if self.alerts.write().await.remove(&alert_id).is_some() {
trace!("removed alert from memory");
} else {
warn!("Alert ID- {alert_id} not found in memory!");
Expand All @@ -886,11 +826,10 @@ impl Alerts {
}

/// Get state of alert using alert_id
pub async fn get_state(&self, alert_id: &str) -> Result<AlertState, AlertError> {
pub async fn get_state(&self, alert_id: Ulid) -> Result<AlertState, AlertError> {
let read_access = self.alerts.read().await;
let alert = read_access.iter().find(|a| a.id == alert_id);

if let Some(alert) = alert {
if let Some(alert) = read_access.get(&alert_id) {
Ok(alert.state)
} else {
let msg = format!("No alert present for ID- {alert_id}");
Expand All @@ -901,33 +840,26 @@ impl Alerts {
/// Update the scheduled alert tasks in-memory map
pub async fn update_task(
&self,
id: &str,
id: Ulid,
handle: JoinHandle<()>,
rx: Receiver<()>,
tx: Sender<()>,
) {
let mut s = self.scheduled_tasks.write().await;
s.remove(id);
s.insert(id.to_owned(), (handle, rx, tx));
self.scheduled_tasks
.write()
.await
.insert(id, (handle, rx, tx));
}

/// Remove a scheduled alert task
pub async fn delete_task(&self, alert_id: &str) -> Result<(), AlertError> {
let read_access = self.scheduled_tasks.read().await;

let hashed_object = read_access.iter().find(|(id, _)| *id == alert_id);

if hashed_object.is_some() {
// drop the read access in order to get exclusive write access
drop(read_access);

// now delete from hashmap
let removed = self.scheduled_tasks.write().await.remove(alert_id);

if removed.is_none() {
trace!("Unable to remove alert task {alert_id} from hashmap");
}
} else {
pub async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError> {
if self
.scheduled_tasks
.write()
.await
.remove(&alert_id)
.is_none()
{
trace!("Alert task {alert_id} not found in hashmap");
}

Expand Down
6 changes: 3 additions & 3 deletions src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Target {
let retry = target_timeout.times;
let timeout = target_timeout.interval;
let target = self.target.clone();
let alert_id = alert_context.alert_info.alert_id.clone();
let alert_id = alert_context.alert_info.alert_id;

let sleep_and_check_if_call =
move |timeout_state: Arc<Mutex<TimeoutState>>, current_state: AlertState| {
Expand All @@ -128,7 +128,7 @@ impl Target {
tokio::spawn(async move {
match retry {
Retry::Infinite => loop {
let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await {
let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await {
state
} else {
*state.lock().unwrap() = TimeoutState::default();
Expand All @@ -144,7 +144,7 @@ impl Target {
},
Retry::Finite(times) => {
for _ in 0..(times - 1) {
let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await {
let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await {
state
} else {
*state.lock().unwrap() = TimeoutState::default();
Expand Down
Loading
Loading