diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 375ed2cbc..71b41494e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -27,12 +27,17 @@ import org.opensearch.alerting.elasticapi.firstFailureOrNull import org.opensearch.alerting.elasticapi.retry import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.ActionExecutionResult +import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger -import org.opensearch.alerting.model.TriggerRunResult -import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.IndexUtils +import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler @@ -55,18 +60,19 @@ class AlertService( val alertIndices: AlertIndices ) { + companion object { + const val MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT = 500 + } + private val logger = LogManager.getLogger(AlertService::class.java) - suspend fun loadCurrentAlerts(monitor: Monitor): Map { - val request = SearchRequest(AlertIndices.ALERT_INDEX) - .routing(monitor.id) - .source(alertQuery(monitor)) - val response: SearchResponse = client.suspendUntil { client.search(request, it) } - if (response.status() != RestStatus.OK) { - throw (response.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) - } + suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map { + val searchAlertsResponse: SearchResponse = searchAlerts( + monitorId = monitor.id, + size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check + ) - val foundAlerts = response.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } + val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } .groupBy { it.triggerId } foundAlerts.values.forEach { alerts -> if (alerts.size > 1) { @@ -74,12 +80,34 @@ class AlertService( } } - return monitor.triggers.associate { trigger -> - trigger to (foundAlerts[trigger.id]?.firstOrNull()) + return monitor.triggers.associateWith { trigger -> + foundAlerts[trigger.id]?.firstOrNull() + } + } + + suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map> { + val searchAlertsResponse: SearchResponse = searchAlerts( + monitorId = monitor.id, + // TODO: This should be limited based on a circuit breaker that limits Alerts + size = MAX_BUCKET_LEVEL_MONITOR_ALERT_SEARCH_COUNT + ) + + val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } + .groupBy { it.triggerId } + + return monitor.triggers.associateWith { trigger -> + // Default to an empty map if there are no Alerts found for a Trigger to make Alert categorization logic easier + (foundAlerts[trigger.id]?.mapNotNull { alert -> + alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert } + }?.toMap() ?: mutableMapOf()) as MutableMap } } - fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? { + fun composeQueryLevelAlert( + ctx: QueryLevelTriggerExecutionContext, + result: QueryLevelTriggerRunResult, + alertError: AlertError? + ): Alert? { val currentTime = Instant.now() val currentAlert = ctx.alert @@ -100,11 +128,11 @@ class AlertService( } } // add action execution results which not exist in current alert - updatedActionExecutionResults.addAll(result.actionResults.filter { it -> !currentActionIds.contains(it.key) } - .map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) + updatedActionExecutionResults.addAll(result.actionResults.filter { !currentActionIds.contains(it.key) } + .map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) } else { - updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime, - if (it.value.throttled) 1 else 0) }) + updatedActionExecutionResults.addAll(result.actionResults.map { + ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) } // Merge the alert's error message to the current alert's history @@ -129,6 +157,92 @@ class AlertService( } } + fun updateActionResultsForBucketLevelAlert( + currentAlert: Alert, + actionResults: Map, + alertError: AlertError? + ): Alert { + val updatedActionExecutionResults = mutableListOf() + val currentActionIds = mutableSetOf() + // Update alert's existing action execution results + for (actionExecutionResult in currentAlert.actionExecutionResults) { + val actionId = actionExecutionResult.actionId + currentActionIds.add(actionId) + val actionRunResult = actionResults[actionId] + when { + actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult) + actionRunResult.throttled -> + updatedActionExecutionResults.add(actionExecutionResult.copy( + throttledCount = actionExecutionResult.throttledCount + 1)) + else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime)) + } + } + + // Add action execution results not currently present in the alert + updatedActionExecutionResults.addAll( + actionResults.filter { !currentActionIds.contains(it.key) } + .map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) } + ) + + val updatedErrorHistory = currentAlert.errorHistory.update(alertError) + return if (alertError == null) { + currentAlert.copy(errorHistory = updatedErrorHistory, actionExecutionResults = updatedActionExecutionResults) + } else { + currentAlert.copy( + state = Alert.State.ERROR, + errorMessage = alertError.message, + errorHistory = updatedErrorHistory, + actionExecutionResults = updatedActionExecutionResults + ) + } + } + + // TODO: Can change the parameters to use ctx: BucketLevelTriggerExecutionContext instead of monitor/trigger and + // result: AggTriggerRunResult for aggResultBuckets + // TODO: Can refactor this method to use Sets instead which can cleanup some of the categorization logic (like getting completed alerts) + fun getCategorizedAlertsForBucketLevelMonitor( + monitor: Monitor, + trigger: BucketLevelTrigger, + currentAlerts: MutableMap, + aggResultBuckets: List + ): Map> { + val dedupedAlerts = mutableListOf() + val newAlerts = mutableListOf() + val currentTime = Instant.now() + + aggResultBuckets.forEach { aggAlertBucket -> + val currentAlert = currentAlerts[aggAlertBucket.getBucketKeysHash()] + if (currentAlert != null) { + // De-duped Alert + dedupedAlerts.add(currentAlert.copy(lastNotificationTime = currentTime, aggregationResultBucket = aggAlertBucket)) + + // Remove de-duped Alert from currentAlerts since it is no longer a candidate for a potentially completed Alert + currentAlerts.remove(aggAlertBucket.getBucketKeysHash()) + } else { + // New Alert + // TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here + val newAlert = Alert(monitor = monitor, trigger = trigger, startTime = currentTime, + lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null, + errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(), + schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket) + newAlerts.add(newAlert) + } + } + + return mapOf( + AlertCategory.DEDUPED to dedupedAlerts, + AlertCategory.NEW to newAlerts + ) + } + + fun convertToCompletedAlerts(currentAlerts: Map?): List { + val currentTime = Instant.now() + return currentAlerts?.map { + it.value.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null, + schemaVersion = IndexUtils.alertIndexSchemaVersion) + } ?: listOf() + } + suspend fun saveAlerts(alerts: List, retryPolicy: BackoffPolicy) { var requestsToRetry = alerts.flatMap { alert -> // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. @@ -177,6 +291,68 @@ class AlertService( } } + /** + * This is a separate method created specifically for saving new Alerts during the Bucket-Level Monitor run. + * Alerts are saved in two batches during the execution of an Bucket-Level Monitor, once before the Actions are executed + * and once afterwards. This method saves Alerts to the [AlertIndices.ALERT_INDEX] but returns the same Alerts with their document IDs. + * + * The Alerts are required with their indexed ID so that when the new Alerts are updated after the Action execution, + * the ID is available for the index request so that the existing Alert can be updated, instead of creating a duplicate Alert document. + */ + suspend fun saveNewAlerts(alerts: List, retryPolicy: BackoffPolicy): List { + val savedAlerts = mutableListOf() + var alertsBeingIndexed = alerts + var requestsToRetry: MutableList = alerts.map { alert -> + if (alert.state != Alert.State.ACTIVE) { + throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]") + } + if (alert.id != Alert.NO_ID) { + throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]") + } + IndexRequest(AlertIndices.ALERT_INDEX) + .routing(alert.monitorId) + .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + }.toMutableList() + + if (requestsToRetry.isEmpty()) return listOf() + + // Retry Bulk requests if there was any 429 response. + // The responses of a bulk request will be in the same order as the individual requests. + // If the index request succeeded for an Alert, the document ID from the response is taken and saved in the Alert. + // If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in + // relation to index request in the retried bulk request for when it eventually succeeds. + retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { + val bulkRequest = BulkRequest().add(requestsToRetry) + val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } + // TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below + val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } + + requestsToRetry = mutableListOf() + val alertsBeingRetried = mutableListOf() + bulkResponse.items.forEach { item -> + if (item.isFailed) { + // TODO: What if the failure cause was not TOO_MANY_REQUESTS, should these be saved and logged? + if (item.status() == RestStatus.TOO_MANY_REQUESTS) { + requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest) + alertsBeingRetried.add(alertsBeingIndexed[item.itemId]) + } + } else { + // The ID of the BulkItemResponse in this case is the document ID resulting from the DocWriteRequest operation + savedAlerts.add(alertsBeingIndexed[item.itemId].copy(id = item.id)) + } + } + + alertsBeingIndexed = alertsBeingRetried + + if (requestsToRetry.isNotEmpty()) { + val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause + throw ExceptionsHelper.convertToOpenSearchException(retryCause) + } + } + + return savedAlerts + } + private fun contentParser(bytesReference: BytesReference): XContentParser { val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON) @@ -184,10 +360,29 @@ class AlertService( return xcp } - private fun alertQuery(monitor: Monitor): SearchSourceBuilder { - return SearchSourceBuilder.searchSource() - .size(monitor.triggers.size * 2) // We expect there to be only a single in-progress alert so fetch 2 to check - .query(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id)) + /** + * Searches for Alerts in the [AlertIndices.ALERT_INDEX]. + * + * @param monitorId The Monitor to get Alerts for + * @param size The number of search hits (Alerts) to return + */ + private suspend fun searchAlerts(monitorId: String, size: Int): SearchResponse { + val queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) + + val searchSourceBuilder = SearchSourceBuilder() + .size(size) + .query(queryBuilder) + + val searchRequest = SearchRequest(AlertIndices.ALERT_INDEX) + .routing(monitorId) + .source(searchSourceBuilder) + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + if (searchResponse.status() != RestStatus.OK) { + throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) + } + + return searchResponse } private fun List?.update(alertError: AlertError?): List { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt new file mode 100644 index 000000000..e4c65b1c3 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.model.AggregationResultBucket +import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.Version +import org.opensearch.client.Client +import org.opensearch.cluster.node.DiscoveryNode +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.ClusterSettings +import org.opensearch.common.settings.Setting +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.test.ClusterServiceUtils +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.threadpool.ThreadPool +import org.junit.Before +import org.mockito.Mockito +import java.time.Instant +import java.time.temporal.ChronoUnit + +class AlertServiceTests : OpenSearchTestCase() { + + private lateinit var client: Client + private lateinit var xContentRegistry: NamedXContentRegistry + private lateinit var settings: Settings + private lateinit var threadPool: ThreadPool + private lateinit var clusterService: ClusterService + + private lateinit var alertIndices: AlertIndices + private lateinit var alertService: AlertService + + @Before + fun setup() { + // TODO: If more *Service unit tests are added, this configuration can be moved to some base class for each service test class to use + client = Mockito.mock(Client::class.java) + xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java) + threadPool = Mockito.mock(ThreadPool::class.java) + clusterService = Mockito.mock(ClusterService::class.java) + + settings = Settings.builder().build() + val settingSet = hashSetOf>() + settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + settingSet.add(AlertingSettings.ALERT_HISTORY_ENABLED) + settingSet.add(AlertingSettings.ALERT_HISTORY_MAX_DOCS) + settingSet.add(AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE) + settingSet.add(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD) + settingSet.add(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD) + settingSet.add(AlertingSettings.REQUEST_TIMEOUT) + val discoveryNode = DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT) + val clusterSettings = ClusterSettings(settings, settingSet) + val testClusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings) + clusterService = Mockito.spy(testClusterService) + + alertIndices = AlertIndices(settings, client, threadPool, clusterService) + alertService = AlertService(client, xContentRegistry, alertIndices) + } + + fun `test getting categorized alerts for bucket-level monitor with no current alerts`() { + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val currentAlerts = mutableMapOf() + val aggResultBuckets = createAggregationResultBucketsFromBucketKeys(listOf( + listOf("a"), + listOf("b") + )) + + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + // Completed Alerts are what remains in currentAlerts after categorization + val completedAlerts = currentAlerts.values.toList() + assertEquals(listOf(), categorizedAlerts[AlertCategory.DEDUPED]) + assertAlertsExistForBucketKeys(listOf( + listOf("a"), + listOf("b") + ), categorizedAlerts[AlertCategory.NEW] ?: error("New alerts not found")) + assertEquals(listOf(), completedAlerts) + } + + fun `test getting categorized alerts for bucket-level monitor with de-duped alerts`() { + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val currentAlerts = createCurrentAlertsFromBucketKeys(monitor, trigger, listOf( + listOf("a"), + listOf("b") + )) + val aggResultBuckets = createAggregationResultBucketsFromBucketKeys(listOf( + listOf("a"), + listOf("b") + )) + + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + // Completed Alerts are what remains in currentAlerts after categorization + val completedAlerts = currentAlerts.values.toList() + assertAlertsExistForBucketKeys(listOf( + listOf("a"), + listOf("b") + ), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found")) + assertEquals(listOf(), categorizedAlerts[AlertCategory.NEW]) + assertEquals(listOf(), completedAlerts) + } + + fun `test getting categorized alerts for bucket-level monitor with completed alerts`() { + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val currentAlerts = createCurrentAlertsFromBucketKeys(monitor, trigger, listOf( + listOf("a"), + listOf("b") + )) + val aggResultBuckets = listOf() + + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + // Completed Alerts are what remains in currentAlerts after categorization + val completedAlerts = currentAlerts.values.toList() + assertEquals(listOf(), categorizedAlerts[AlertCategory.DEDUPED]) + assertEquals(listOf(), categorizedAlerts[AlertCategory.NEW]) + assertAlertsExistForBucketKeys(listOf( + listOf("a"), + listOf("b") + ), completedAlerts) + } + + fun `test getting categorized alerts for bucket-level monitor with de-duped and completed alerts`() { + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + + val currentAlerts = createCurrentAlertsFromBucketKeys(monitor, trigger, listOf( + listOf("a"), + listOf("b") + )) + val aggResultBuckets = createAggregationResultBucketsFromBucketKeys(listOf( + listOf("b"), + listOf("c") + )) + + val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets) + // Completed Alerts are what remains in currentAlerts after categorization + val completedAlerts = currentAlerts.values.toList() + assertAlertsExistForBucketKeys(listOf(listOf("b")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found")) + assertAlertsExistForBucketKeys(listOf(listOf("c")), categorizedAlerts[AlertCategory.NEW] ?: error("New alerts not found")) + assertAlertsExistForBucketKeys(listOf(listOf("a")), completedAlerts) + } + + private fun createCurrentAlertsFromBucketKeys( + monitor: Monitor, + trigger: BucketLevelTrigger, + bucketKeysList: List> + ): MutableMap { + return bucketKeysList.map { bucketKeys -> + val aggResultBucket = AggregationResultBucket("parent_bucket_path", bucketKeys, mapOf()) + val alert = Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = listOf(randomActionExecutionResult()), aggregationResultBucket = aggResultBucket) + aggResultBucket.getBucketKeysHash() to alert + }.toMap() as MutableMap + } + + private fun createAggregationResultBucketsFromBucketKeys(bucketKeysList: List>): List { + return bucketKeysList.map { AggregationResultBucket("parent_bucket_path", it, mapOf()) } + } + + private fun assertAlertsExistForBucketKeys(bucketKeysList: List>, alerts: List) { + // Check if size is equals first for sanity and since bucketKeysList should have unique entries, + // this ensures there shouldn't be duplicates in the alerts + assertEquals(bucketKeysList.size, alerts.size) + val expectedBucketKeyHashes = bucketKeysList.map { it.joinToString(separator = "#") }.toSet() + alerts.forEach { alert -> + assertNotNull(alert.aggregationResultBucket) + assertTrue(expectedBucketKeyHashes.contains(alert.aggregationResultBucket!!.getBucketKeysHash())) + } + } +}