diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index c8cf9fa04..01be538d0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -58,6 +58,7 @@ import org.opensearch.alerting.model.action.Action.Companion.SUBJECT import org.opensearch.alerting.model.action.ActionExecutionScope import org.opensearch.alerting.model.action.AlertCategory import org.opensearch.alerting.model.action.PerAlertActionScope +import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext @@ -74,7 +75,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE -import org.opensearch.alerting.util.getActionScope +import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.isADMonitor @@ -486,9 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { monitorOrTriggerError = monitorOrTriggerError ) for (action in trigger.actions) { - if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) { - val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope - for (alertCategory in perAlertActionScope.actionableAlerts) { + // ActionExecutionPolicy should not be null for Bucket-Level Monitors since it has a default config when not set explicitly + val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope + if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { + for (alertCategory in actionExecutionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { val actionCtx = getActionContextForAlertCategory( @@ -502,11 +504,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { // Keeping the throttled response separate from runAction for now since // throttling is not supported for PER_EXECUTION - val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) { - ActionRunResult(action.id, action.name, mapOf(), true, null, null) - } else { + val actionResult = if (isActionActionable(action, alert)) { runAction(action, actionCtx, dryrun) + } else { + ActionRunResult(action.id, action.name, mapOf(), true, null, null) } + triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult) alertsToUpdate.add(alert) // Remove the alert from completedAlertsToUpdate in case it is present there since @@ -514,7 +517,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { completedAlertsToUpdate.remove(alert) } } - } else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) { + } else if (actionExecutionScope is PerExecutionActionScope || shouldDefaultToPerExecution) { // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action. // If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified. if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue @@ -638,23 +641,6 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return true } - // TODO: Add unit test for this method (or at least cover it in MonitorRunnerIT) - // Bucket-Level Monitors use the throttle configurations defined in ActionExecutionPolicy, this method evaluates that configuration. - private fun isBucketLevelTriggerActionThrottled(action: Action, alert: Alert): Boolean { - if (action.actionExecutionPolicy.throttle == null) return false - // TODO: This will need to be updated if throttleEnabled is moved to ActionExecutionPolicy - if (action.throttleEnabled) { - val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id } - val lastExecutionTime: Instant? = result?.lastExecutionTime - val throttledTimeBound = currentTime().minus( - action.actionExecutionPolicy.throttle.value.toLong(), - action.actionExecutionPolicy.throttle.unit - ) - return !(lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound)) - } - return false - } - private fun getActionContextForAlertCategory( alertCategory: AlertCategory, alert: Alert, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Action.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Action.kt index 2d7b02733..e3f030175 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Action.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Action.kt @@ -49,7 +49,7 @@ data class Action( val throttleEnabled: Boolean, val throttle: Throttle?, val id: String = UUIDs.base64UUID(), - val actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration() + val actionExecutionPolicy: ActionExecutionPolicy? = null ) : Writeable, ToXContentObject { init { @@ -57,6 +57,10 @@ data class Action( require(subjectTemplate.lang == MUSTACHE) { "subject_template must be a mustache script" } } require(messageTemplate.lang == MUSTACHE) { "message_template must be a mustache script" } + + if (actionExecutionPolicy?.actionExecutionScope is PerExecutionActionScope) { + require(throttle == null) { "Throttle is currently not supported for per execution action scope" } + } } @Throws(IOException::class) @@ -68,7 +72,7 @@ data class Action( sin.readBoolean(), // throttleEnabled sin.readOptionalWriteable(::Throttle), // throttle sin.readString(), // id - ActionExecutionPolicy(sin) // actionExecutionPolicy + sin.readOptionalWriteable(::ActionExecutionPolicy) // actionExecutionPolicy ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -78,13 +82,15 @@ data class Action( .field(DESTINATION_ID_FIELD, destinationId) .field(MESSAGE_TEMPLATE_FIELD, messageTemplate) .field(THROTTLE_ENABLED_FIELD, throttleEnabled) - .field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy) if (subjectTemplate != null) { xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate) } if (throttle != null) { xContentBuilder.field(THROTTLE_FIELD, throttle) } + if (actionExecutionPolicy != null) { + xContentBuilder.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy) + } return xContentBuilder.endObject() } @@ -111,7 +117,12 @@ data class Action( out.writeBoolean(false) } out.writeString(id) - actionExecutionPolicy.writeTo(out) + if (actionExecutionPolicy != null) { + out.writeBoolean(true) + actionExecutionPolicy.writeTo(out) + } else { + out.writeBoolean(false) + } } companion object { @@ -138,7 +149,7 @@ data class Action( lateinit var messageTemplate: Script var throttleEnabled = false var throttle: Throttle? = null - var actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration() + var actionExecutionPolicy: ActionExecutionPolicy? = null XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -160,7 +171,11 @@ data class Action( throttleEnabled = xcp.booleanValue() } ACTION_EXECUTION_POLICY_FIELD -> { - actionExecutionPolicy = ActionExecutionPolicy.parse(xcp) + actionExecutionPolicy = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + ActionExecutionPolicy.parse(xcp) + } } else -> { throw IllegalStateException("Unexpected field: $fieldName, while parsing action") @@ -180,7 +195,7 @@ data class Action( throttleEnabled, throttle, id = requireNotNull(id), - actionExecutionPolicy = requireNotNull(actionExecutionPolicy) { "Action execution policy is null" } + actionExecutionPolicy = actionExecutionPolicy ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/ActionExecutionPolicy.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/ActionExecutionPolicy.kt index a5eaa7a43..23bc294e0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/ActionExecutionPolicy.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/ActionExecutionPolicy.kt @@ -25,41 +25,23 @@ import java.io.IOException /** * This class represents the container for various configurations which control Action behavior. */ -// TODO: Should throttleEnabled be included in here as well? data class ActionExecutionPolicy( - val throttle: Throttle? = null, val actionExecutionScope: ActionExecutionScope ) : Writeable, ToXContentObject { - init { - if (actionExecutionScope is PerExecutionActionScope) { - require(throttle == null) { "Throttle is currently not supported for per execution action scope" } - } - } - @Throws(IOException::class) constructor(sin: StreamInput) : this ( - sin.readOptionalWriteable(::Throttle), // throttle ActionExecutionScope.readFrom(sin) // actionExecutionScope ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - val xContentBuilder = builder.startObject() - if (throttle != null) { - xContentBuilder.field(THROTTLE_FIELD, throttle) - } - xContentBuilder.field(ACTION_EXECUTION_SCOPE, actionExecutionScope) - return xContentBuilder.endObject() + builder.startObject() + .field(ACTION_EXECUTION_SCOPE, actionExecutionScope) + return builder.endObject() } @Throws(IOException::class) override fun writeTo(out: StreamOutput) { - if (throttle != null) { - out.writeBoolean(true) - throttle.writeTo(out) - } else { - out.writeBoolean(false) - } if (actionExecutionScope is PerAlertActionScope) { out.writeEnum(ActionExecutionScope.Type.PER_ALERT) } else { @@ -69,13 +51,11 @@ data class ActionExecutionPolicy( } companion object { - const val THROTTLE_FIELD = "throttle" const val ACTION_EXECUTION_SCOPE = "action_execution_scope" @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): ActionExecutionPolicy { - var throttle: Throttle? = null lateinit var actionExecutionScope: ActionExecutionScope ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -84,15 +64,11 @@ data class ActionExecutionPolicy( xcp.nextToken() when (fieldName) { - THROTTLE_FIELD -> { - throttle = if (xcp.currentToken() == Token.VALUE_NULL) null else Throttle.parse(xcp) - } ACTION_EXECUTION_SCOPE -> actionExecutionScope = ActionExecutionScope.parse(xcp) } } return ActionExecutionPolicy( - throttle, requireNotNull(actionExecutionScope) { "Action execution scope is null" } ) } @@ -104,17 +80,16 @@ data class ActionExecutionPolicy( } /** - * The default [ActionExecutionPolicy] configuration. + * The default [ActionExecutionPolicy] configuration for Bucket-Level Monitors. * - * This is currently only used by Bucket-Level Monitors and was configured with that in mind. * If Query-Level Monitors integrate the use of [ActionExecutionPolicy] then a separate default configuration - * might need to be made depending on the desired behavior. + * will need to be made depending on the desired behavior. */ - fun getDefaultConfiguration(): ActionExecutionPolicy { + fun getDefaultConfigurationForBucketLevelMonitor(): ActionExecutionPolicy { val defaultActionExecutionScope = PerAlertActionScope( actionableAlerts = setOf(AlertCategory.DEDUPED, AlertCategory.NEW) ) - return ActionExecutionPolicy(throttle = null, actionExecutionScope = defaultActionExecutionScope) + return ActionExecutionPolicy(actionExecutionScope = defaultActionExecutionScope) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 12fe034e7..61f4dd9e0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -34,7 +34,7 @@ import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.action.Action -import org.opensearch.alerting.model.action.ActionExecutionScope +import org.opensearch.alerting.model.action.ActionExecutionPolicy import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.commons.authuser.User @@ -153,8 +153,17 @@ fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.Monito */ fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#") -fun Action.getActionScope(): ActionExecutionScope.Type = - this.actionExecutionPolicy.actionExecutionScope.getExecutionScope() +fun Action.getActionExecutionPolicy(monitor: Monitor): ActionExecutionPolicy? { + // When the ActionExecutionPolicy is null for an Action, the default is resolved at runtime + // so it can be chosen based on the Monitor type at that time. + // The Action config is not aware of the Monitor type which is why the default was not stored during + // the parse. + return this.actionExecutionPolicy ?: if (monitor.isBucketLevelMonitor()) { + ActionExecutionPolicy.getDefaultConfigurationForBucketLevelMonitor() + } else { + null + } +} fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult( prevTriggerRunResult: BucketLevelTriggerRunResult? diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index e8d6c10dc..6194191e6 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -1285,10 +1285,10 @@ class MonitorRunnerIT : AlertingRestTestCase() { params.docCount > 1 """.trimIndent() - val action = randomAction( + val action = randomActionWithPolicy( template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id, - actionExecutionPolicy = ActionExecutionPolicy(null, PerExecutionActionScope()) + actionExecutionPolicy = ActionExecutionPolicy(PerExecutionActionScope()) ) var trigger = randomBucketLevelTrigger(actions = listOf(action)) trigger = trigger.copy( @@ -1352,10 +1352,10 @@ class MonitorRunnerIT : AlertingRestTestCase() { params.docCount > 1 """.trimIndent() - val action = randomAction( + val action = randomActionWithPolicy( template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id, - actionExecutionPolicy = ActionExecutionPolicy(null, PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))) + actionExecutionPolicy = ActionExecutionPolicy(PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))) ) var trigger = randomBucketLevelTrigger(actions = listOf(action)) trigger = trigger.copy( @@ -1419,21 +1419,21 @@ class MonitorRunnerIT : AlertingRestTestCase() { params.docCount > 0 """.trimIndent() - val actionThrottleEnabled = randomAction( + val actionThrottleEnabled = randomActionWithPolicy( template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id, throttleEnabled = true, + throttle = Throttle(value = 5, unit = MINUTES), actionExecutionPolicy = ActionExecutionPolicy( - throttle = Throttle(value = 5, unit = MINUTES), actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)) ) ) - val actionThrottleNotEnabled = randomAction( + val actionThrottleNotEnabled = randomActionWithPolicy( template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id, throttleEnabled = false, + throttle = Throttle(value = 5, unit = MINUTES), actionExecutionPolicy = ActionExecutionPolicy( - throttle = Throttle(value = 5, unit = MINUTES), actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)) ) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index e1de5dba4..c576ba104 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -175,7 +175,7 @@ fun randomBucketLevelTrigger( name = name, severity = severity, bucketSelector = bucketSelector, - actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomActionWithPolicy(destinationId = destinationId) } else actions ) } @@ -239,13 +239,28 @@ fun randomTemplateScript( ): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params) fun randomAction( + name: String = OpenSearchRestTestCase.randomUnicodeOfLength(10), + template: Script = randomTemplateScript("Hello World"), + destinationId: String = "", + throttleEnabled: Boolean = false, + throttle: Throttle = randomThrottle() +) = Action(name, destinationId, template, template, throttleEnabled, throttle, actionExecutionPolicy = null) + +fun randomActionWithPolicy( name: String = OpenSearchRestTestCase.randomUnicodeOfLength(10), template: Script = randomTemplateScript("Hello World"), destinationId: String = "", throttleEnabled: Boolean = false, throttle: Throttle = randomThrottle(), - actionExecutionPolicy: ActionExecutionPolicy = randomActionExecutionPolicy() -) = Action(name, destinationId, template, template, throttleEnabled, throttle, actionExecutionPolicy = actionExecutionPolicy) + actionExecutionPolicy: ActionExecutionPolicy? = randomActionExecutionPolicy() +): Action { + return if (actionExecutionPolicy?.actionExecutionScope is PerExecutionActionScope) { + // Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it + Action(name, destinationId, template, template, throttleEnabled, null, actionExecutionPolicy = actionExecutionPolicy) + } else { + Action(name, destinationId, template, template, throttleEnabled, throttle, actionExecutionPolicy = actionExecutionPolicy) + } +} fun randomThrottle( value: Int = randomIntBetween(60, 120), @@ -253,16 +268,8 @@ fun randomThrottle( ) = Throttle(value, unit) fun randomActionExecutionPolicy( - throttle: Throttle = randomThrottle(), actionExecutionScope: ActionExecutionScope = randomActionExecutionScope() -): ActionExecutionPolicy { - return if (actionExecutionScope is PerExecutionActionScope) { - // Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it - ActionExecutionPolicy(null, actionExecutionScope) - } else { - ActionExecutionPolicy(throttle, actionExecutionScope) - } -} +) = ActionExecutionPolicy(actionExecutionScope) fun randomActionExecutionScope(): ActionExecutionScope { return if (randomBoolean()) { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt index db38de381..3268309d1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt @@ -31,6 +31,7 @@ import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.ActionExecutionPolicy +import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup @@ -38,6 +39,7 @@ import org.opensearch.alerting.parser import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomActionExecutionPolicy import org.opensearch.alerting.randomActionExecutionResult +import org.opensearch.alerting.randomActionWithPolicy import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomBucketLevelMonitor import org.opensearch.alerting.randomBucketLevelTrigger @@ -55,6 +57,7 @@ import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase +import java.time.temporal.ChronoUnit import kotlin.test.assertFailsWith class XContentTests : OpenSearchTestCase() { @@ -88,6 +91,18 @@ class XContentTests : OpenSearchTestCase() { } } + fun `test action with per execution scope does not support throttling`() { + try { + val action = randomActionWithPolicy().copy( + throttleEnabled = true, + throttle = Throttle(value = 5, unit = ChronoUnit.MINUTES), + actionExecutionPolicy = ActionExecutionPolicy(PerExecutionActionScope()) + ) + fail("Creating an action with per execution scope and throttle enabled did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + fun `test throttle parsing`() { val throttle = randomThrottle() val throttleString = throttle.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() @@ -342,11 +357,4 @@ class XContentTests : OpenSearchTestCase() { val parsedActionExecutionPolicy = ActionExecutionPolicy.parse(parser(actionExecutionPolicyString)) assertEquals("Round tripping ActionExecutionPolicy doesn't work", actionExecutionPolicy, parsedActionExecutionPolicy) } - - fun `test action execution policy with null throttle`() { - val actionExecutionPolicy = randomActionExecutionPolicy().copy(throttle = null) - val actionExecutionPolicyString = actionExecutionPolicy.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() - val parsedActionExecutionPolicy = ActionExecutionPolicy.parse(parser(actionExecutionPolicyString)) - assertEquals("Round tripping ActionExecutionPolicy doesn't work", actionExecutionPolicy, parsedActionExecutionPolicy) - } }