Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,40 +135,32 @@ pub struct Message {

impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
if let Some(col) = column {
return get_field(&schema.fields, col).is_some();
}
true
pub fn valid(&self, schema: &Schema, column: &str) -> bool {
return get_field(&schema.fields, column).is_some();
}

pub fn extract_column_name(&self) -> Option<&str> {
let re = Regex::new(r"\{(.*?)\}").unwrap();
let tokens: Vec<&str> = re
pub fn extract_column_names(&self) -> Vec<&str> {
// the message can have either no column name ({column_name} not present) or any number of {column_name} present
Regex::new(r"\{(.*?)\}")
.unwrap()
.captures_iter(self.message.as_str())
.map(|cap| cap.get(1).unwrap().as_str())
.collect();
// the message can have either no column name ({column_name} not present) or one column name
// return Some only if there is exactly one column name present
if tokens.len() == 1 {
return Some(tokens[0]);
}
None
.collect()
}

// returns the message with the column name replaced with the value of the column
/// Returns the message with the column names replaced with the values in the column.
fn get(&self, event: RecordBatch) -> String {
if let Some(column) = self.extract_column_name() {
let mut replace_message = self.message.clone();
for column in self.extract_column_names() {
if let Some(value) = event.column_by_name(column) {
let arr = cast(value, &DataType::Utf8).unwrap();
let value = as_string_array(&arr).value(0);

return self
.message
.replace(&format!("{{{column}}}"), value.to_string().as_str());
replace_message =
replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str());
}
}
self.message.clone()
replace_message
}
}

Expand Down
22 changes: 11 additions & 11 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ pub async fn put_alert(

let schema = STREAM_INFO.schema(&stream_name)?;
for alert in &alerts.alerts {
let column = alert.message.extract_column_name();
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
let col = column.unwrap_or("");
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
col.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
for column in alert.message.extract_column_names() {
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
column.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
}
}
}

Expand Down