Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.alerting.model.action.Action.Companion.SUBJECT
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
Expand All @@ -74,7 +75,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE
import org.opensearch.alerting.util.getActionScope
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.isADMonitor
Expand Down Expand Up @@ -486,9 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
monitorOrTriggerError = monitorOrTriggerError
)
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) {
val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionScope.actionableAlerts) {
// ActionExecutionPolicy should not be null for Bucket-Level Monitors since it has a default config when not set explicitly
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val actionCtx = getActionContextForAlertCategory(
Expand All @@ -502,19 +504,20 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

// Keeping the throttled response separate from runAction for now since
// throttling is not supported for PER_EXECUTION
val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
} else {
val actionResult = if (isActionActionable(action, alert)) {
runAction(action, actionCtx, dryrun)
} else {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}

triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) {
} else if (actionExecutionScope is PerExecutionActionScope || shouldDefaultToPerExecution) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue
Expand Down Expand Up @@ -638,23 +641,6 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return true
}

// TODO: Add unit test for this method (or at least cover it in MonitorRunnerIT)
// Bucket-Level Monitors use the throttle configurations defined in ActionExecutionPolicy, this method evaluates that configuration.
private fun isBucketLevelTriggerActionThrottled(action: Action, alert: Alert): Boolean {
if (action.actionExecutionPolicy.throttle == null) return false
// TODO: This will need to be updated if throttleEnabled is moved to ActionExecutionPolicy
if (action.throttleEnabled) {
val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id }
val lastExecutionTime: Instant? = result?.lastExecutionTime
val throttledTimeBound = currentTime().minus(
action.actionExecutionPolicy.throttle.value.toLong(),
action.actionExecutionPolicy.throttle.unit
)
return !(lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound))
}
return false
}

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ data class Action(
val throttleEnabled: Boolean,
val throttle: Throttle?,
val id: String = UUIDs.base64UUID(),
val actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()
val actionExecutionPolicy: ActionExecutionPolicy? = null
) : Writeable, ToXContentObject {

init {
if (subjectTemplate != null) {
require(subjectTemplate.lang == MUSTACHE) { "subject_template must be a mustache script" }
}
require(messageTemplate.lang == MUSTACHE) { "message_template must be a mustache script" }

if (actionExecutionPolicy?.actionExecutionScope is PerExecutionActionScope) {
require(throttle == null) { "Throttle is currently not supported for per execution action scope" }
}
}

@Throws(IOException::class)
Expand All @@ -68,7 +72,7 @@ data class Action(
sin.readBoolean(), // throttleEnabled
sin.readOptionalWriteable(::Throttle), // throttle
sin.readString(), // id
ActionExecutionPolicy(sin) // actionExecutionPolicy
sin.readOptionalWriteable(::ActionExecutionPolicy) // actionExecutionPolicy
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -78,13 +82,15 @@ data class Action(
.field(DESTINATION_ID_FIELD, destinationId)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy)
if (subjectTemplate != null) {
xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
}
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
if (actionExecutionPolicy != null) {
xContentBuilder.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy)
}
return xContentBuilder.endObject()
}

Expand All @@ -111,7 +117,12 @@ data class Action(
out.writeBoolean(false)
}
out.writeString(id)
actionExecutionPolicy.writeTo(out)
if (actionExecutionPolicy != null) {
out.writeBoolean(true)
actionExecutionPolicy.writeTo(out)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -138,7 +149,7 @@ data class Action(
lateinit var messageTemplate: Script
var throttleEnabled = false
var throttle: Throttle? = null
var actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()
var actionExecutionPolicy: ActionExecutionPolicy? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -160,7 +171,11 @@ data class Action(
throttleEnabled = xcp.booleanValue()
}
ACTION_EXECUTION_POLICY_FIELD -> {
actionExecutionPolicy = ActionExecutionPolicy.parse(xcp)
actionExecutionPolicy = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
ActionExecutionPolicy.parse(xcp)
}
}
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
Expand All @@ -180,7 +195,7 @@ data class Action(
throttleEnabled,
throttle,
id = requireNotNull(id),
actionExecutionPolicy = requireNotNull(actionExecutionPolicy) { "Action execution policy is null" }
actionExecutionPolicy = actionExecutionPolicy
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,23 @@ import java.io.IOException
/**
* This class represents the container for various configurations which control Action behavior.
*/
// TODO: Should throttleEnabled be included in here as well?
data class ActionExecutionPolicy(
val throttle: Throttle? = null,
val actionExecutionScope: ActionExecutionScope
) : Writeable, ToXContentObject {

init {
if (actionExecutionScope is PerExecutionActionScope) {
require(throttle == null) { "Throttle is currently not supported for per execution action scope" }
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this (
sin.readOptionalWriteable(::Throttle), // throttle
ActionExecutionScope.readFrom(sin) // actionExecutionScope
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val xContentBuilder = builder.startObject()
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
xContentBuilder.field(ACTION_EXECUTION_SCOPE, actionExecutionScope)
return xContentBuilder.endObject()
builder.startObject()
.field(ACTION_EXECUTION_SCOPE, actionExecutionScope)
return builder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
if (throttle != null) {
out.writeBoolean(true)
throttle.writeTo(out)
} else {
out.writeBoolean(false)
}
if (actionExecutionScope is PerAlertActionScope) {
out.writeEnum(ActionExecutionScope.Type.PER_ALERT)
} else {
Expand All @@ -69,13 +51,11 @@ data class ActionExecutionPolicy(
}

companion object {
const val THROTTLE_FIELD = "throttle"
const val ACTION_EXECUTION_SCOPE = "action_execution_scope"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionPolicy {
var throttle: Throttle? = null
lateinit var actionExecutionScope: ActionExecutionScope

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -84,15 +64,11 @@ data class ActionExecutionPolicy(
xcp.nextToken()

when (fieldName) {
THROTTLE_FIELD -> {
throttle = if (xcp.currentToken() == Token.VALUE_NULL) null else Throttle.parse(xcp)
}
ACTION_EXECUTION_SCOPE -> actionExecutionScope = ActionExecutionScope.parse(xcp)
}
}

return ActionExecutionPolicy(
throttle,
requireNotNull(actionExecutionScope) { "Action execution scope is null" }
)
}
Expand All @@ -104,17 +80,16 @@ data class ActionExecutionPolicy(
}

/**
* The default [ActionExecutionPolicy] configuration.
* The default [ActionExecutionPolicy] configuration for Bucket-Level Monitors.
*
* This is currently only used by Bucket-Level Monitors and was configured with that in mind.
* If Query-Level Monitors integrate the use of [ActionExecutionPolicy] then a separate default configuration
* might need to be made depending on the desired behavior.
* will need to be made depending on the desired behavior.
*/
fun getDefaultConfiguration(): ActionExecutionPolicy {
fun getDefaultConfigurationForBucketLevelMonitor(): ActionExecutionPolicy {
val defaultActionExecutionScope = PerAlertActionScope(
actionableAlerts = setOf(AlertCategory.DEDUPED, AlertCategory.NEW)
)
return ActionExecutionPolicy(throttle = null, actionExecutionScope = defaultActionExecutionScope)
return ActionExecutionPolicy(actionExecutionScope = defaultActionExecutionScope)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.ActionExecutionPolicy
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -153,8 +153,17 @@ fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.Monito
*/
fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#")

fun Action.getActionScope(): ActionExecutionScope.Type =
this.actionExecutionPolicy.actionExecutionScope.getExecutionScope()
fun Action.getActionExecutionPolicy(monitor: Monitor): ActionExecutionPolicy? {
// When the ActionExecutionPolicy is null for an Action, the default is resolved at runtime
// so it can be chosen based on the Monitor type at that time.
// The Action config is not aware of the Monitor type which is why the default was not stored during
// the parse.
return this.actionExecutionPolicy ?: if (monitor.isBucketLevelMonitor()) {
ActionExecutionPolicy.getDefaultConfigurationForBucketLevelMonitor()
} else {
null
}
}

fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult(
prevTriggerRunResult: BucketLevelTriggerRunResult?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,10 +1285,10 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 1
""".trimIndent()

val action = randomAction(
val action = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = ActionExecutionPolicy(null, PerExecutionActionScope())
actionExecutionPolicy = ActionExecutionPolicy(PerExecutionActionScope())
)
var trigger = randomBucketLevelTrigger(actions = listOf(action))
trigger = trigger.copy(
Expand Down Expand Up @@ -1352,10 +1352,10 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 1
""".trimIndent()

val action = randomAction(
val action = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = ActionExecutionPolicy(null, PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)))
actionExecutionPolicy = ActionExecutionPolicy(PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)))
)
var trigger = randomBucketLevelTrigger(actions = listOf(action))
trigger = trigger.copy(
Expand Down Expand Up @@ -1419,21 +1419,21 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 0
""".trimIndent()

val actionThrottleEnabled = randomAction(
val actionThrottleEnabled = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = true,
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionPolicy = ActionExecutionPolicy(
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))
)
)
val actionThrottleNotEnabled = randomAction(
val actionThrottleNotEnabled = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = false,
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionPolicy = ActionExecutionPolicy(
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))
)
)
Expand Down
Loading