@@ -25,7 +25,7 @@ use datafusion::{
2525 min_max:: { max, min} ,
2626 sum:: sum,
2727 } ,
28- prelude:: { col, lit, Expr } ,
28+ prelude:: { col, lit, DataFrame , Expr } ,
2929} ;
3030use tracing:: trace;
3131
@@ -41,8 +41,8 @@ use crate::{
4141} ;
4242
4343use super :: {
44- AggregateConfig , AggregateOperation , Aggregations , AlertConfig , AlertError , AlertOperator ,
45- AlertState , ConditionConfig , Conditions , ALERTS ,
44+ AggregateConfig , AggregateOperation , AggregateResult , Aggregations , AlertConfig , AlertError ,
45+ AlertOperator , AlertState , ConditionConfig , Conditions , ALERTS ,
4646} ;
4747
4848async fn get_tables_from_query ( query : & str ) -> Result < TableScanVisitor , AlertError > {
@@ -100,6 +100,16 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul
100100pub async fn evaluate_alert ( alert : & AlertConfig ) -> Result < ( ) , AlertError > {
101101 trace ! ( "RUNNING EVAL TASK FOR- {alert:?}" ) ;
102102
103+ let query = prepare_query ( alert) . await ?;
104+ let base_df = execute_base_query ( & query, & alert. query ) . await ?;
105+ let agg_results = evaluate_aggregates ( & alert. aggregate_config , & base_df) . await ?;
106+ let final_res = calculate_final_result ( & alert. aggregate_config , & agg_results) ;
107+
108+ update_alert_state ( alert, final_res, & agg_results) . await ?;
109+ Ok ( ( ) )
110+ }
111+
112+ async fn prepare_query ( alert : & AlertConfig ) -> Result < crate :: query:: Query , AlertError > {
103113 let ( start_time, end_time) = match & alert. eval_type {
104114 super :: EvalConfig :: RollingWindow ( rolling_window) => {
105115 ( & rolling_window. eval_start , & rolling_window. eval_end )
@@ -109,195 +119,158 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
109119 let session_state = QUERY_SESSION . state ( ) ;
110120 let raw_logical_plan = session_state. create_logical_plan ( & alert. query ) . await ?;
111121
112- // TODO: Filter tags should be taken care of!!!
113122 let time_range = TimeRange :: parse_human_time ( start_time, end_time)
114123 . map_err ( |err| AlertError :: CustomError ( err. to_string ( ) ) ) ?;
115124
116- let query = crate :: query:: Query {
125+ Ok ( crate :: query:: Query {
117126 raw_logical_plan,
118127 time_range,
119128 filter_tag : None ,
120- } ;
129+ } )
130+ }
121131
122- // for now proceed in a similar fashion as we do in query
123- // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
124- let stream_name = if let Some ( stream_name) = query. first_table_name ( ) {
125- stream_name
126- } else {
127- return Err ( AlertError :: CustomError ( format ! (
128- "Table name not found in query- {}" ,
129- alert. query
130- ) ) ) ;
131- } ;
132+ async fn execute_base_query (
133+ query : & crate :: query:: Query ,
134+ original_query : & str ,
135+ ) -> Result < DataFrame , AlertError > {
136+ let stream_name = query. first_table_name ( ) . ok_or_else ( || {
137+ AlertError :: CustomError ( format ! ( "Table name not found in query- {}" , original_query) )
138+ } ) ?;
132139
133- let base_df = query
140+ query
134141 . get_dataframe ( stream_name)
135142 . await
136- . map_err ( |err| AlertError :: CustomError ( err. to_string ( ) ) ) ?;
143+ . map_err ( |err| AlertError :: CustomError ( err. to_string ( ) ) )
144+ }
137145
138- let mut agg_results = vec ! [ ] ;
139-
140- let agg_filter_exprs = get_exprs ( & alert. aggregate_config ) ;
141-
142- let final_res = match & alert. aggregate_config . operator {
143- Some ( op) => {
144- match op {
145- AggregateCondition :: And | AggregateCondition :: Or => {
146- let agg1 = & alert. aggregate_config . aggregate_conditions [ 0 ] ;
147- let agg2 = & alert. aggregate_config . aggregate_conditions [ 1 ] ;
148-
149- for ( ( agg_expr, filter) , agg) in agg_filter_exprs. into_iter ( ) . zip ( [ agg1, agg2] )
150- {
151- let filtered_df = if let Some ( filter) = filter {
152- base_df. clone ( ) . filter ( filter) ?
153- } else {
154- base_df. clone ( )
155- } ;
156-
157- let aggregated_rows = filtered_df
158- . aggregate ( vec ! [ ] , vec ! [ agg_expr] ) ?
159- . collect ( )
160- . await ?;
161-
162- let final_value = get_final_value ( aggregated_rows) ;
163-
164- // now compare
165- let res = match & agg. operator {
166- AlertOperator :: GreaterThan => final_value > agg. value ,
167- AlertOperator :: LessThan => final_value < agg. value ,
168- AlertOperator :: EqualTo => final_value == agg. value ,
169- AlertOperator :: NotEqualTo => final_value != agg. value ,
170- AlertOperator :: GreaterThanEqualTo => final_value >= agg. value ,
171- AlertOperator :: LessThanEqualTo => final_value <= agg. value ,
172- _ => unreachable ! ( ) ,
173- } ;
174-
175- let message = if res {
176- if agg. condition_config . is_some ( ) {
177- Some (
178- agg. condition_config
179- . as_ref ( )
180- . unwrap ( )
181- . generate_filter_message ( ) ,
182- )
183- } else {
184- Some ( String :: default ( ) )
185- }
186- } else {
187- None
188- } ;
189-
190- agg_results. push ( ( res, message, agg, final_value) ) ;
191- }
192- let res = match & alert. aggregate_config . operator . clone ( ) . unwrap ( ) {
193- AggregateCondition :: And => agg_results. iter ( ) . all ( |( r, _, _, _) | * r) ,
194- AggregateCondition :: Or => agg_results. iter ( ) . any ( |( r, _, _, _) | * r) ,
195- } ;
146+ async fn evaluate_aggregates (
147+ agg_config : & Aggregations ,
148+ base_df : & DataFrame ,
149+ ) -> Result < Vec < AggregateResult > , AlertError > {
150+ let agg_filter_exprs = get_exprs ( agg_config) ;
151+ let mut results = Vec :: new ( ) ;
196152
197- res
198- }
199- }
200- }
201- None => {
202- let agg = & alert. aggregate_config . aggregate_conditions [ 0 ] ;
203- let ( agg_expr, filter) = & agg_filter_exprs[ 0 ] ;
204- let filtered_df = if let Some ( filter) = filter {
205- base_df. filter ( filter. clone ( ) ) ?
206- } else {
207- base_df
208- } ;
153+ let conditions = match & agg_config. operator {
154+ Some ( _) => & agg_config. aggregate_conditions [ 0 ..2 ] ,
155+ None => & agg_config. aggregate_conditions [ 0 ..1 ] ,
156+ } ;
209157
210- let aggregated_rows = filtered_df
211- . aggregate ( vec ! [ ] , vec ! [ agg_expr. clone( ) ] ) ?
212- . collect ( )
213- . await ?;
214-
215- let final_value = get_final_value ( aggregated_rows) ;
216-
217- // now compare
218- let res = match & agg. operator {
219- AlertOperator :: GreaterThan => final_value > agg. value ,
220- AlertOperator :: LessThan => final_value < agg. value ,
221- AlertOperator :: EqualTo => final_value == agg. value ,
222- AlertOperator :: NotEqualTo => final_value != agg. value ,
223- AlertOperator :: GreaterThanEqualTo => final_value >= agg. value ,
224- AlertOperator :: LessThanEqualTo => final_value <= agg. value ,
225- _ => unreachable ! ( ) ,
226- } ;
158+ for ( ( agg_expr, filter) , agg) in agg_filter_exprs. into_iter ( ) . zip ( conditions) {
159+ let result = evaluate_single_aggregate ( base_df, filter, agg_expr, agg) . await ?;
160+ results. push ( result) ;
161+ }
227162
228- let message = if res {
229- if agg . condition_config . is_some ( ) {
230- Some (
231- agg . condition_config
232- . as_ref ( )
233- . unwrap ( )
234- . generate_filter_message ( ) ,
235- )
236- } else {
237- Some ( String :: default ( ) )
238- }
239- } else {
240- None
241- } ;
163+ Ok ( results )
164+ }
165+
166+ async fn evaluate_single_aggregate (
167+ base_df : & DataFrame ,
168+ filter : Option < Expr > ,
169+ agg_expr : Expr ,
170+ agg : & AggregateConfig ,
171+ ) -> Result < AggregateResult , AlertError > {
172+ let filtered_df = if let Some ( filter ) = filter {
173+ base_df . clone ( ) . filter ( filter ) ?
174+ } else {
175+ base_df . clone ( )
176+ } ;
242177
243- agg_results. push ( ( res, message, agg, final_value) ) ;
178+ let aggregated_rows = filtered_df
179+ . aggregate ( vec ! [ ] , vec ! [ agg_expr] ) ?
180+ . collect ( )
181+ . await ?;
244182
245- res
246- }
183+ let final_value = get_final_value ( aggregated_rows) ;
184+ let result = evaluate_condition ( & agg. operator , final_value, agg. value ) ;
185+
186+ let message = if result {
187+ agg. condition_config
188+ . as_ref ( )
189+ . map ( |config| config. generate_filter_message ( ) )
190+ . or ( Some ( String :: default ( ) ) )
191+ } else {
192+ None
247193 } ;
248194
249- trace ! (
250- "alert.state.eq(&AlertState::Triggered)-\n {}" ,
251- alert. state. eq( & AlertState :: Triggered )
252- ) ;
253- trace ! ( "final_res- {final_res}" ) ;
195+ Ok ( AggregateResult {
196+ result,
197+ message,
198+ config : agg. clone ( ) ,
199+ value : final_value,
200+ } )
201+ }
254202
255- if final_res {
256- trace ! ( "ALERT!!!!!!" ) ;
203+ fn evaluate_condition ( operator : & AlertOperator , actual : f64 , expected : f64 ) -> bool {
204+ match operator {
205+ AlertOperator :: GreaterThan => actual > expected,
206+ AlertOperator :: LessThan => actual < expected,
207+ AlertOperator :: EqualTo => actual == expected,
208+ AlertOperator :: NotEqualTo => actual != expected,
209+ AlertOperator :: GreaterThanEqualTo => actual >= expected,
210+ AlertOperator :: LessThanEqualTo => actual <= expected,
211+ _ => unreachable ! ( ) ,
212+ }
213+ }
257214
258- let mut message = String :: default ( ) ;
259- for ( _, filter_msg, agg_config, final_value) in agg_results {
260- if let Some ( msg) = filter_msg {
261- message. extend ( [ format ! (
262- "|{}({}) WHERE ({}) {} {} (ActualValue: {})|" ,
263- agg_config. agg,
264- agg_config. column,
265- msg,
266- agg_config. operator,
267- agg_config. value,
268- final_value
269- ) ] ) ;
270- } else {
271- message. extend ( [ format ! (
272- "|{}({}) {} {} (ActualValue: {})" ,
273- agg_config. agg,
274- agg_config. column,
275- agg_config. operator,
276- agg_config. value,
277- final_value
278- ) ] ) ;
279- }
280- }
215+ fn calculate_final_result ( agg_config : & Aggregations , results : & [ AggregateResult ] ) -> bool {
216+ match & agg_config. operator {
217+ Some ( AggregateCondition :: And ) => results. iter ( ) . all ( |r| r. result ) ,
218+ Some ( AggregateCondition :: Or ) => results. iter ( ) . any ( |r| r. result ) ,
219+ None => results. first ( ) . map_or ( false , |r| r. result ) ,
220+ }
221+ }
281222
282- // update state
223+ async fn update_alert_state (
224+ alert : & AlertConfig ,
225+ final_res : bool ,
226+ agg_results : & [ AggregateResult ] ,
227+ ) -> Result < ( ) , AlertError > {
228+ if final_res {
229+ trace ! ( "ALERT!!!!!!" ) ;
230+ let message = format_alert_message ( agg_results) ;
283231 ALERTS
284232 . update_state ( & alert. id . to_string ( ) , AlertState :: Triggered , Some ( message) )
285- . await ? ;
233+ . await
286234 } else if ALERTS
287235 . get_state ( & alert. id )
288236 . await ?
289237 . eq ( & AlertState :: Triggered )
290238 {
291239 ALERTS
292240 . update_state ( & alert. id . to_string ( ) , AlertState :: Resolved , Some ( "" . into ( ) ) )
293- . await ? ;
241+ . await
294242 } else {
295243 ALERTS
296244 . update_state ( & alert. id . to_string ( ) , AlertState :: Resolved , None )
297- . await ? ;
245+ . await
298246 }
247+ }
299248
300- Ok ( ( ) )
249+ fn format_alert_message ( agg_results : & [ AggregateResult ] ) -> String {
250+ let mut message = String :: default ( ) ;
251+ for result in agg_results {
252+ if let Some ( msg) = & result. message {
253+ message. extend ( [ format ! (
254+ "|{}({}) WHERE ({}) {} {} (ActualValue: {})|" ,
255+ result. config. agg,
256+ result. config. column,
257+ msg,
258+ result. config. operator,
259+ result. config. value,
260+ result. value
261+ ) ] ) ;
262+ } else {
263+ message. extend ( [ format ! (
264+ "|{}({}) {} {} (ActualValue: {})" ,
265+ result. config. agg,
266+ result. config. column,
267+ result. config. operator,
268+ result. config. value,
269+ result. value
270+ ) ] ) ;
271+ }
272+ }
273+ message
301274}
302275
303276fn get_final_value ( aggregated_rows : Vec < RecordBatch > ) -> f64 {
0 commit comments