diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index b96d5d363..c29d61573 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -229,19 +229,15 @@ async fn update_alert_state( trace!("ALERT!!!!!!"); let message = format_alert_message(agg_results); ALERTS - .update_state(&alert.id.to_string(), AlertState::Triggered, Some(message)) + .update_state(alert.id, AlertState::Triggered, Some(message)) .await - } else if ALERTS - .get_state(&alert.id) - .await? - .eq(&AlertState::Triggered) - { + } else if ALERTS.get_state(alert.id).await?.eq(&AlertState::Triggered) { ALERTS - .update_state(&alert.id.to_string(), AlertState::Resolved, Some("".into())) + .update_state(alert.id, AlertState::Resolved, Some("".into())) .await } else { ALERTS - .update_state(&alert.id.to_string(), AlertState::Resolved, None) + .update_state(alert.id, AlertState::Resolved, None) .await } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 0c2d0eb48..bd6603088 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -17,8 +17,6 @@ */ use actix_web::http::header::ContentType; -use actix_web::web::Json; -use actix_web::{FromRequest, HttpRequest}; use alerts_utils::user_auth_for_query; use async_trait::async_trait; use chrono::Utc; @@ -29,12 +27,11 @@ use once_cell::sync::Lazy; use serde_json::Error as SerdeError; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; -use std::future::Future; -use std::pin::Pin; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{trace, warn}; +use ulid::Ulid; pub mod alerts_utils; pub mod target; @@ -46,13 +43,11 @@ use crate::storage; use crate::storage::ObjectStorageError; use crate::sync::schedule_alert_task; use crate::utils::time::TimeRange; -use crate::utils::{get_hash, uid}; use self::target::Target; // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); -pub type ScheduledTasks = RwLock>; pub const CURRENT_ALERTS_VERSION: &str = "v1"; @@ -60,8 +55,8 @@ pub static ALERTS: Lazy = Lazy::new(Alerts::default); #[derive(Debug, Default)] pub struct Alerts { - pub alerts: RwLock>, - pub scheduled_tasks: ScheduledTasks, + pub alerts: RwLock>, + pub scheduled_tasks: RwLock>, } #[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -132,7 +127,7 @@ impl Context { #[derive(Debug, Clone)] pub struct AlertInfo { - alert_id: String, + alert_id: Ulid, alert_name: String, // message: String, // reason: String, @@ -142,7 +137,7 @@ pub struct AlertInfo { impl AlertInfo { pub fn new( - alert_id: String, + alert_id: Ulid, alert_name: String, alert_state: AlertState, severity: String, @@ -159,16 +154,12 @@ impl AlertInfo { #[derive(Debug, Clone)] pub struct DeploymentInfo { deployment_instance: String, - deployment_id: uid::Uid, + deployment_id: Ulid, deployment_mode: String, } impl DeploymentInfo { - pub fn new( - deployment_instance: String, - deployment_id: uid::Uid, - deployment_mode: String, - ) -> Self { + pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { Self { deployment_instance, deployment_id, @@ -405,43 +396,11 @@ pub struct AlertRequest { pub targets: Vec, } -impl FromRequest for AlertRequest { - type Error = actix_web::Error; - type Future = Pin>>>; - - fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let body = Json::::from_request(req, payload); - let fut = async move { - let body = body.await?.into_inner(); - Ok(body) - }; - - Box::pin(fut) - } -} - -impl AlertRequest { - pub fn modify(self, alert: AlertConfig) -> AlertConfig { - AlertConfig { - version: alert.version, - id: alert.id, - severity: alert.severity, - title: self.title, - query: self.query, - alert_type: self.alert_type, - aggregate_config: self.aggregate_config, - eval_type: self.eval_type, - targets: self.targets, - state: AlertState::default(), - } - } -} - impl From for AlertConfig { fn from(val: AlertRequest) -> AlertConfig { AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), - id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), + id: Ulid::new(), severity: val.severity, title: val.title, query: val.query, @@ -458,7 +417,8 @@ impl From for AlertConfig { #[serde(rename_all = "camelCase")] pub struct AlertConfig { pub version: AlertVerison, - pub id: String, + #[serde(default)] + pub id: Ulid, pub severity: Severity, pub title: String, pub query: String, @@ -467,11 +427,21 @@ pub struct AlertConfig { pub eval_type: EvalConfig, pub targets: Vec, // for new alerts, state should be resolved - #[serde(default = "AlertState::default")] + #[serde(default)] pub state: AlertState, } impl AlertConfig { + pub fn modify(&mut self, alert: AlertRequest) { + self.title = alert.title; + self.query = alert.query; + self.alert_type = alert.alert_type; + self.aggregate_config = alert.aggregate_config; + self.eval_type = alert.eval_type; + self.targets = alert.targets; + self.state = AlertState::default(); + } + /// Validations pub async fn validate(&self) -> Result<(), AlertError> { // validate evalType @@ -694,7 +664,7 @@ impl AlertConfig { Context::new( AlertInfo::new( - self.id.to_string(), + self.id, self.title.clone(), self.state, self.severity.clone().to_string(), @@ -757,33 +727,20 @@ impl actix_web::ResponseError for AlertError { } impl Alerts { - /// Loads alerts from disk - /// spawn scheduled tasks - /// Evaluate + /// Loads alerts from disk, blocks pub async fn load(&self) -> Result<(), AlertError> { - let mut this = vec![]; + let mut map = self.alerts.write().await; let store = CONFIG.storage().get_object_store(); - let all_alerts = store.get_alerts().await.unwrap_or_default(); - - for alert in all_alerts { - if alert.is_empty() { - continue; - } - - let alert: AlertConfig = serde_json::from_slice(&alert)?; + for alert in store.get_alerts().await.unwrap_or_default() { let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; - self.update_task(&alert.id, handle, rx, tx).await; + self.update_task(alert.id, handle, rx, tx).await; - this.push(alert); + map.insert(alert.id, alert); } - let mut s = self.alerts.write().await; - s.append(&mut this.clone()); - drop(s); - Ok(()) } @@ -793,7 +750,7 @@ impl Alerts { session: SessionKey, ) -> Result, AlertError> { let mut alerts: Vec = Vec::new(); - for alert in self.alerts.read().await.iter() { + for (_, alert) in self.alerts.read().await.iter() { // filter based on whether the user can execute this query or not let query = &alert.query; if user_auth_for_query(&session, query).await.is_ok() { @@ -805,11 +762,9 @@ impl Alerts { } /// Returns a sigle alert that the user has access to (based on query auth) - pub async fn get_alert_by_id(&self, id: &str) -> Result { + pub async fn get_alert_by_id(&self, id: Ulid) -> Result { let read_access = self.alerts.read().await; - let alert = read_access.iter().find(|a| a.id == id); - - if let Some(alert) = alert { + if let Some(alert) = read_access.get(&id) { Ok(alert.clone()) } else { Err(AlertError::CustomError(format!( @@ -820,15 +775,13 @@ impl Alerts { /// Update the in-mem vector of alerts pub async fn update(&self, alert: &AlertConfig) { - let mut s = self.alerts.write().await; - s.retain(|a| a.id != alert.id); - s.push(alert.clone()); + self.alerts.write().await.insert(alert.id, alert.clone()); } /// Update the state of alert pub async fn update_state( &self, - alert_id: &str, + alert_id: Ulid, new_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError> { @@ -847,8 +800,7 @@ impl Alerts { // modify in memory let mut writer = self.alerts.write().await; - let alert_to_update = writer.iter_mut().find(|alert| alert.id == alert_id); - if let Some(alert) = alert_to_update { + if let Some(alert) = writer.get_mut(&alert_id) { trace!("in memory alert-\n{}", alert.state); alert.state = new_state; trace!("in memory updated alert-\n{}", alert.state); @@ -864,20 +816,8 @@ impl Alerts { } /// Remove alert and scheduled task from disk and memory - pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> { - // delete from memory - let read_access = self.alerts.read().await; - - let index = read_access - .iter() - .enumerate() - .find(|(_, alert)| alert.id == alert_id) - .to_owned(); - - if let Some((index, _)) = index { - // drop the read access in order to get exclusive write access - drop(read_access); - self.alerts.write().await.remove(index); + pub async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError> { + if self.alerts.write().await.remove(&alert_id).is_some() { trace!("removed alert from memory"); } else { warn!("Alert ID- {alert_id} not found in memory!"); @@ -886,11 +826,10 @@ impl Alerts { } /// Get state of alert using alert_id - pub async fn get_state(&self, alert_id: &str) -> Result { + pub async fn get_state(&self, alert_id: Ulid) -> Result { let read_access = self.alerts.read().await; - let alert = read_access.iter().find(|a| a.id == alert_id); - if let Some(alert) = alert { + if let Some(alert) = read_access.get(&alert_id) { Ok(alert.state) } else { let msg = format!("No alert present for ID- {alert_id}"); @@ -901,33 +840,26 @@ impl Alerts { /// Update the scheduled alert tasks in-memory map pub async fn update_task( &self, - id: &str, + id: Ulid, handle: JoinHandle<()>, rx: Receiver<()>, tx: Sender<()>, ) { - let mut s = self.scheduled_tasks.write().await; - s.remove(id); - s.insert(id.to_owned(), (handle, rx, tx)); + self.scheduled_tasks + .write() + .await + .insert(id, (handle, rx, tx)); } /// Remove a scheduled alert task - pub async fn delete_task(&self, alert_id: &str) -> Result<(), AlertError> { - let read_access = self.scheduled_tasks.read().await; - - let hashed_object = read_access.iter().find(|(id, _)| *id == alert_id); - - if hashed_object.is_some() { - // drop the read access in order to get exclusive write access - drop(read_access); - - // now delete from hashmap - let removed = self.scheduled_tasks.write().await.remove(alert_id); - - if removed.is_none() { - trace!("Unable to remove alert task {alert_id} from hashmap"); - } - } else { + pub async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError> { + if self + .scheduled_tasks + .write() + .await + .remove(&alert_id) + .is_none() + { trace!("Alert task {alert_id} not found in hashmap"); } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 664291aec..b92784cc4 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -104,7 +104,7 @@ impl Target { let retry = target_timeout.times; let timeout = target_timeout.interval; let target = self.target.clone(); - let alert_id = alert_context.alert_info.alert_id.clone(); + let alert_id = alert_context.alert_info.alert_id; let sleep_and_check_if_call = move |timeout_state: Arc>, current_state: AlertState| { @@ -128,7 +128,7 @@ impl Target { tokio::spawn(async move { match retry { Retry::Infinite => loop { - let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await { state } else { *state.lock().unwrap() = TimeoutState::default(); @@ -144,7 +144,7 @@ impl Target { }, Retry::Finite(times) => { for _ in 0..(times - 1) { - let current_state = if let Ok(state) = ALERTS.get_state(&alert_id).await { + let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await { state } else { *state.lock().unwrap() = TimeoutState::default(); diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index c5b7069af..9bdeefc2c 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -17,14 +17,15 @@ */ use crate::{ - option::CONFIG, - storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY}, - sync::schedule_alert_task, + option::CONFIG, storage::object_storage::alert_json_path, sync::schedule_alert_task, utils::actix::extract_session_key_from_req, }; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{ + web::{self, Json, Path}, + HttpRequest, Responder, +}; use bytes::Bytes; -use relative_path::RelativePathBuf; +use ulid::Ulid; use crate::alerts::{ alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS, @@ -41,7 +42,10 @@ pub async fn list(req: HttpRequest) -> Result { } // POST /alerts -pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result { +pub async fn post( + req: HttpRequest, + Json(alert): Json, +) -> Result { let alert: AlertConfig = alert.into(); alert.validate().await?; @@ -57,26 +61,23 @@ pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result Result { +pub async fn get(req: HttpRequest, alert_id: Path) -> Result { let session_key = extract_session_key_from_req(&req)?; - let id = req - .match_info() - .get("alert_id") - .ok_or(AlertError::Metadata("No alert ID Provided"))?; + let alert_id = alert_id.into_inner(); - let alert = ALERTS.get_alert_by_id(id).await?; + let alert = ALERTS.get_alert_by_id(alert_id).await?; // validate that the user has access to the tables mentioned user_auth_for_query(&session_key, &alert.query).await?; @@ -85,12 +86,9 @@ pub async fn get(req: HttpRequest) -> Result { // DELETE /alerts/{alert_id} /// Deletion should happen from disk, sheduled tasks, then memory -pub async fn delete(req: HttpRequest) -> Result { +pub async fn delete(req: HttpRequest, alert_id: Path) -> Result { let session_key = extract_session_key_from_req(&req)?; - let alert_id = req - .match_info() - .get("alert_id") - .ok_or(AlertError::Metadata("No alert ID Provided"))?; + let alert_id = alert_id.into_inner(); let alert = ALERTS.get_alert_by_id(alert_id).await?; @@ -118,57 +116,51 @@ pub async fn delete(req: HttpRequest) -> Result { // PUT /alerts/{alert_id} /// first save on disk, then in memory /// then modify scheduled task -pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result { +pub async fn modify( + req: HttpRequest, + alert_id: Path, + Json(alert_request): Json, +) -> Result { let session_key = extract_session_key_from_req(&req)?; - let alert_id = req - .match_info() - .get("alert_id") - .ok_or(AlertError::Metadata("No alert ID Provided"))?; + let alert_id = alert_id.into_inner(); // check if alert id exists in map - let old_alert = ALERTS.get_alert_by_id(alert_id).await?; + let mut alert = ALERTS.get_alert_by_id(alert_id).await?; // validate that the user has access to the tables mentioned // in the old as well as the modified alert - user_auth_for_query(&session_key, &old_alert.query).await?; user_auth_for_query(&session_key, &alert.query).await?; + user_auth_for_query(&session_key, &alert_request.query).await?; - let store = CONFIG.storage().get_object_store(); - - // fetch the alert object for the relevant ID - let old_alert_config: AlertConfig = serde_json::from_slice( - &store - .get_object(&RelativePathBuf::from_iter([ - ALERTS_ROOT_DIRECTORY, - &format!("{alert_id}.json"), - ])) - .await?, - )?; - - let alert = alert.modify(old_alert_config); + alert.modify(alert_request); alert.validate().await?; // modify task let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; // modify on disk - store.put_alert(&alert.id.to_string(), &alert).await?; + CONFIG + .storage() + .get_object_store() + .put_alert(alert.id, &alert) + .await?; // modify in memory ALERTS.update(&alert).await; - ALERTS.update_task(&alert.id, handle, rx, tx).await; + ALERTS.update_task(alert.id, handle, rx, tx).await; Ok(web::Json(alert)) } // PUT /alerts/{alert_id}/update_state -pub async fn update_state(req: HttpRequest, state: String) -> Result { +pub async fn update_state( + req: HttpRequest, + alert_id: Path, + state: String, +) -> Result { let session_key = extract_session_key_from_req(&req)?; - let alert_id = req - .match_info() - .get("alert_id") - .ok_or(AlertError::Metadata("No alert ID Provided"))?; + let alert_id = alert_id.into_inner(); // check if alert id exists in map let alert = ALERTS.get_alert_by_id(alert_id).await?; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 588939349..cd29bac57 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -49,7 +49,8 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::error; +use tracing::{error, warn}; +use ulid::Ulid; use std::collections::BTreeMap; use std::fmt::Debug; @@ -255,7 +256,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn put_alert( &self, - alert_id: &str, + alert_id: Ulid, alert: &AlertConfig, ) -> Result<(), ObjectStorageError> { self.put_object(&alert_json_path(alert_id), to_bytes(alert)) @@ -335,16 +336,23 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(serde_json::from_slice(&schema_map)?) } - async fn get_alerts(&self) -> Result, ObjectStorageError> { + async fn get_alerts(&self) -> Result, ObjectStorageError> { let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); - let alerts_bytes = self + let alerts = self .get_objects( Some(&alerts_path), Box::new(|file_name| file_name.ends_with(".json")), ) - .await?; + .await? + .iter() + .filter_map(|bytes| { + serde_json::from_slice(bytes) + .inspect_err(|err| warn!("Expected compatible json, error = {err}")) + .ok() + }) + .collect(); - Ok(alerts_bytes) + Ok(alerts) } async fn upsert_stream_metadata( @@ -818,7 +826,7 @@ pub fn parseable_json_path() -> RelativePathBuf { /// TODO: Needs to be updated for distributed mode #[inline(always)] -pub fn alert_json_path(alert_id: &str) -> RelativePathBuf { +pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) }