Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,97 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.TriggerRunResult
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.Client
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.lang.IllegalArgumentException

/** Service that handles executing Triggers */
class TriggerService(val client: Client, val scriptService: ScriptService) {

private val logger = LogManager.getLogger(TriggerService::class.java)

fun isTriggerActionable(ctx: TriggerExecutionContext, result: TriggerRunResult): Boolean {
fun isQueryLevelTriggerActionable(ctx: QueryLevelTriggerExecutionContext, result: QueryLevelTriggerRunResult): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runTrigger(monitor: Monitor, trigger: Trigger, ctx: TriggerExecutionContext): TriggerRunResult {
fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext
): QueryLevelTriggerRunResult {
return try {
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
TriggerRunResult(trigger.name, triggered, null)
QueryLevelTriggerRunResult(trigger.name, triggered, null)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
TriggerRunResult(trigger.name, true, e)
QueryLevelTriggerRunResult(trigger.name, true, e)
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
trigger: BucketLevelTrigger,
ctx: BucketLevelTriggerExecutionContext
): BucketLevelTriggerRunResult {
return try {
val bucketIndices =
((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)[trigger.id] as HashMap<*, *>)[BUCKET_INDICES] as List<*>
val parentBucketPath = ((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>)[PARENT_BUCKET_PATH] as String
val aggregationPath = AggregationPath.parse(parentBucketPath)
// TODO test this part by passing sub-aggregation path
var parentAgg = (ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
aggregationPath.pathElementsAsStringList.forEach { sub_agg ->
parentAgg = (parentAgg[sub_agg] as HashMap<*, *>)
}
val buckets = parentAgg[Aggregation.CommonFields.BUCKETS.preferredName] as List<*>
val selectedBuckets = mutableMapOf<String, AggregationResultBucket>()
for (bucketIndex in bucketIndices) {
val bucketDict = buckets[bucketIndex as Int] as Map<String, Any>
val bucketKeyValuesList = getBucketKeyValuesList(bucketDict)
val aggResultBucket = AggregationResultBucket(parentBucketPath, bucketKeyValuesList, bucketDict)
selectedBuckets[aggResultBucket.getBucketKeysHash()] = aggResultBucket
}
BucketLevelTriggerRunResult(trigger.name, null, selectedBuckets)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// TODO empty map here with error should be treated in the same way as QueryLevelTrigger with error running script
BucketLevelTriggerRunResult(trigger.name, e, emptyMap())
}
}

@Suppress("UNCHECKED_CAST")
private fun getBucketKeyValuesList(bucket: Map<String, Any>): List<String> {
val keyField = Aggregation.CommonFields.KEY.preferredName
val keyValuesList = mutableListOf<String>()
when {
bucket[keyField] is String -> keyValuesList.add(bucket[keyField] as String)
bucket[keyField] is Map<*, *> -> (bucket[keyField] as Map<String, Any>).values.map { keyValuesList.add(it as String) }
Comment thread
qreshi marked this conversation as resolved.
else -> throw IllegalArgumentException("Unexpected format for key in bucket [$bucket]")
}

return keyValuesList
}
}