Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,27 @@ data class Action(
val messageTemplate: Script,
val throttleEnabled: Boolean,
val throttle: Throttle?,
val id: String = UUIDs.base64UUID()
val id: String = UUIDs.base64UUID(),
val actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()
) : Writeable, ToXContentObject {

init {
if (subjectTemplate != null) {
require(subjectTemplate.lang == Action.MUSTACHE) { "subject_template must be a mustache script" }
require(subjectTemplate.lang == MUSTACHE) { "subject_template must be a mustache script" }
}
require(messageTemplate.lang == Action.MUSTACHE) { "message_template must be a mustache script" }
require(messageTemplate.lang == MUSTACHE) { "message_template must be a mustache script" }
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
constructor(sin: StreamInput): this(
sin.readString(), // name
sin.readString(), // destinationId
sin.readOptionalWriteable(::Script), // subjectTemplate
Script(sin), // messageTemplate
sin.readBoolean(), // throttleEnabled
sin.readOptionalWriteable(::Throttle), // throttle
sin.readString() // id
sin.readString(), // id
ActionExecutionPolicy(sin) // actionExecutionPolicy
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -76,6 +78,7 @@ data class Action(
.field(DESTINATION_ID_FIELD, destinationId)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy)
if (subjectTemplate != null) {
xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
}
Expand Down Expand Up @@ -108,6 +111,7 @@ data class Action(
out.writeBoolean(false)
}
out.writeString(id)
actionExecutionPolicy.writeTo(out)
}

companion object {
Expand All @@ -118,6 +122,7 @@ data class Action(
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val THROTTLE_ENABLED_FIELD = "throttle_enabled"
const val THROTTLE_FIELD = "throttle"
const val ACTION_EXECUTION_POLICY_FIELD = "action_execution_policy"
const val MUSTACHE = "mustache"
const val SUBJECT = "subject"
const val MESSAGE = "message"
Expand All @@ -133,6 +138,7 @@ data class Action(
lateinit var messageTemplate: Script
var throttleEnabled = false
var throttle: Throttle? = null
var actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -153,7 +159,9 @@ data class Action(
THROTTLE_ENABLED_FIELD -> {
throttleEnabled = xcp.booleanValue()
}

ACTION_EXECUTION_POLICY_FIELD -> {
actionExecutionPolicy = ActionExecutionPolicy.parse(xcp)
}
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
}
Expand All @@ -171,7 +179,8 @@ data class Action(
requireNotNull(messageTemplate) { "Action message template is null" },
throttleEnabled,
throttle,
id = requireNotNull(id)
id = requireNotNull(id),
actionExecutionPolicy = requireNotNull(actionExecutionPolicy) { "Action execution policy is null" }

@rishabhmaurya rishabhmaurya Aug 23, 2021

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to old Actions which doesn't have the actionExecutionPolicy? Is this change backward compatible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Query-Level Monitors don't actually use the ActionExecutionPolicy as of now. If we make the change in the future to allow it, we'd update the default based on Monitor type to be compatible with both at that time.

@rishabhmaurya rishabhmaurya Aug 24, 2021

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be on a safer side, we should try to keep the default behavior for existing query-level monitors as it is i.e. to not add any default field for actionExecutionPolicy in the data class for query-level monitors. This will ensure both forward and backward compatibility issues.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. I'll merge in this change for now since it doesn't reflect the full usage of ActionExecutionPolicy. Once we get to the PR with the changes to MonitorRunner, I think we can update actionExecutionPolicy to be nullable and set the default configuration at runtime if it is not set.

)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting.model.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

/**
* This class represents the container for various configurations which control Action behavior.
*/
// TODO: Should throttleEnabled be included in here as well?
data class ActionExecutionPolicy(
val throttle: Throttle? = null,
val actionExecutionScope: ActionExecutionScope
) : Writeable, ToXContentObject {

@Throws(IOException::class)
constructor(sin: StreamInput) : this (
sin.readOptionalWriteable(::Throttle), // throttle
ActionExecutionScope.readFrom(sin) // actionExecutionScope
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val xContentBuilder = builder.startObject()
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
xContentBuilder.field(ACTION_EXECUTION_SCOPE, actionExecutionScope)
return xContentBuilder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
if (throttle != null) {
out.writeBoolean(true)
throttle.writeTo(out)
} else {
out.writeBoolean(false)
}
if (actionExecutionScope is PerAlertActionScope) {
out.writeEnum(ActionExecutionScope.Type.PER_ALERT)
} else {
out.writeEnum(ActionExecutionScope.Type.PER_EXECUTION)
}
actionExecutionScope.writeTo(out)
}

companion object {
const val THROTTLE_FIELD = "throttle"
const val ACTION_EXECUTION_SCOPE = "action_execution_scope"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionPolicy {
var throttle: Throttle? = null
lateinit var actionExecutionScope: ActionExecutionScope

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
THROTTLE_FIELD -> {
throttle = if (xcp.currentToken() == Token.VALUE_NULL) null else Throttle.parse(xcp)
}
ACTION_EXECUTION_SCOPE -> actionExecutionScope = ActionExecutionScope.parse(xcp)
}
}

return ActionExecutionPolicy(
throttle,
requireNotNull(actionExecutionScope) { "Action execution scope is null" }
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): ActionExecutionPolicy {
return ActionExecutionPolicy(sin)
}

/**
* The default [ActionExecutionPolicy] configuration.
*
* This is currently only used by Bucket-Level Monitors and was configured with that in mind.
* If Query-Level Monitors integrate the use of [ActionExecutionPolicy] then a separate default configuration
* might need to be made depending on the desired behavior.
*/
fun getDefaultConfiguration(): ActionExecutionPolicy {
val defaultActionExecutionScope = PerAlertActionScope(
actionableAlerts = setOf(AlertCategory.DEDUPED, AlertCategory.NEW)
)
return ActionExecutionPolicy(throttle = null, actionExecutionScope = defaultActionExecutionScope)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting.model.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import java.lang.IllegalArgumentException

/**
* This class represents configurations used to control the scope of Action executions when Alerts are created.
*/
sealed class ActionExecutionScope : Writeable, ToXContentObject {

enum class Type { PER_ALERT, PER_EXECUTION }

companion object {
const val PER_ALERT_FIELD = "per_alert"
const val PER_EXECUTION_FIELD = "per_execution"
const val ACTIONABLE_ALERTS_FIELD = "actionable_alerts"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionScope {
var type: Type? = null
var actionExecutionScope: ActionExecutionScope? = null
val alertFilter = mutableSetOf<AlertCategory>()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

// If the type field has already been set, the user has provided more than one type of schedule
if (type != null) {
throw IllegalArgumentException("You can only specify one type of action execution scope.")
}

when (fieldName) {
PER_ALERT_FIELD -> {
type = Type.PER_ALERT
while (xcp.nextToken() != Token.END_OBJECT) {
val perAlertFieldName = xcp.currentName()
xcp.nextToken()
when (perAlertFieldName) {
ACTIONABLE_ALERTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
val allowedCategories = AlertCategory.values().map { it.toString() }
while (xcp.nextToken() != Token.END_ARRAY) {
val alertCategory = xcp.text()
if (!allowedCategories.contains(alertCategory)) {
throw IllegalStateException("Actionable alerts should be one of $allowedCategories")
}
alertFilter.add(AlertCategory.valueOf(alertCategory))
}
}
Comment thread
qreshi marked this conversation as resolved.
}
}
}
PER_EXECUTION_FIELD -> {
type = Type.PER_EXECUTION
while (xcp.nextToken() != Token.END_OBJECT) {}
Comment thread
qreshi marked this conversation as resolved.
}
else -> throw IllegalArgumentException("Invalid field [$fieldName] found in action execution scope.")
}
}

if (type == Type.PER_ALERT) {
actionExecutionScope = PerAlertActionScope(alertFilter)
} else if (type == Type.PER_EXECUTION) {
actionExecutionScope = PerExecutionActionScope()
}

return requireNotNull(actionExecutionScope) { "Action execution scope is null." }
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): ActionExecutionScope {
val type = sin.readEnum(ActionExecutionScope.Type::class.java)
return if (type == Type.PER_ALERT) {
PerAlertActionScope(sin)
} else {
PerExecutionActionScope(sin)
}
}
}

abstract fun getExecutionScope(): Type
}

data class PerAlertActionScope(
val actionableAlerts: Set<AlertCategory>
) : ActionExecutionScope() {

@Throws(IOException::class)
constructor(sin: StreamInput): this(
sin.readSet { si -> si.readEnum(AlertCategory::class.java) } // alertFilter
)

override fun getExecutionScope(): Type = Type.PER_ALERT

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(PER_ALERT_FIELD)
.field(ACTIONABLE_ALERTS_FIELD, actionableAlerts.toTypedArray())
.endObject()
return builder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeCollection(actionableAlerts) { o, v -> o.writeEnum(v) }
}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): PerAlertActionScope {
return PerAlertActionScope(sin)
}
}
}

class PerExecutionActionScope() : ActionExecutionScope() {

@Throws(IOException::class)
constructor(sin: StreamInput) : this()

override fun hashCode(): Int {
return javaClass.hashCode()
}

// Creating an equals method that just checks class type rather than reference since this is currently stateless.
// Otherwise, it would have been a dataclass which would have handled this.
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other?.javaClass != javaClass) return false
return true
}

override fun getExecutionScope(): Type = Type.PER_EXECUTION

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(PER_EXECUTION_FIELD)
.endObject()
return builder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): PerExecutionActionScope {
return PerExecutionActionScope(sin)
}
}
}

enum class AlertCategory { DEDUPED, NEW, COMPLETED }
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -146,3 +148,6 @@ fun <T : Any> checkUserFilterByPermissions(
* as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values.
*/
fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#")

fun Action.getActionScope(): ActionExecutionScope.Type =
this.actionExecutionPolicy.actionExecutionScope.getExecutionScope()