diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 81868d093..860c96e28 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -135,40 +135,32 @@ pub struct Message { impl Message { // checks if message (with a column name) is valid (i.e. the column name is present in the schema) - pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool { - if let Some(col) = column { - return get_field(&schema.fields, col).is_some(); - } - true + pub fn valid(&self, schema: &Schema, column: &str) -> bool { + return get_field(&schema.fields, column).is_some(); } - pub fn extract_column_name(&self) -> Option<&str> { - let re = Regex::new(r"\{(.*?)\}").unwrap(); - let tokens: Vec<&str> = re + pub fn extract_column_names(&self) -> Vec<&str> { + // the message can have either no column name ({column_name} not present) or any number of {column_name} present + Regex::new(r"\{(.*?)\}") + .unwrap() .captures_iter(self.message.as_str()) .map(|cap| cap.get(1).unwrap().as_str()) - .collect(); - // the message can have either no column name ({column_name} not present) or one column name - // return Some only if there is exactly one column name present - if tokens.len() == 1 { - return Some(tokens[0]); - } - None + .collect() } - // returns the message with the column name replaced with the value of the column + /// Returns the message with the column names replaced with the values in the column. fn get(&self, event: RecordBatch) -> String { - if let Some(column) = self.extract_column_name() { + let mut replace_message = self.message.clone(); + for column in self.extract_column_names() { if let Some(value) = event.column_by_name(column) { let arr = cast(value, &DataType::Utf8).unwrap(); let value = as_string_array(&arr).value(0); - return self - .message - .replace(&format!("{{{column}}}"), value.to_string().as_str()); + replace_message = + replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str()); } } - self.message.clone() + replace_message } } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 5fa5fbb43..e1dd1a1cd 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -154,17 +154,17 @@ pub async fn put_alert( let schema = STREAM_INFO.schema(&stream_name)?; for alert in &alerts.alerts { - let column = alert.message.extract_column_name(); - let is_valid = alert.message.valid(&schema, column); - if !is_valid { - let col = column.unwrap_or(""); - return Err(StreamError::InvalidAlertMessage( - alert.name.to_owned(), - col.to_string(), - )); - } - if !alert.rule.valid_for_schema(&schema) { - return Err(StreamError::InvalidAlert(alert.name.to_owned())); + for column in alert.message.extract_column_names() { + let is_valid = alert.message.valid(&schema, column); + if !is_valid { + return Err(StreamError::InvalidAlertMessage( + alert.name.to_owned(), + column.to_string(), + )); + } + if !alert.rule.valid_for_schema(&schema) { + return Err(StreamError::InvalidAlert(alert.name.to_owned())); + } } }