-
Notifications
You must be signed in to change notification settings - Fork 130
Update AlertService for Bucket-Level Alerting #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,12 +27,17 @@ import org.opensearch.alerting.elasticapi.firstFailureOrNull | |
| import org.opensearch.alerting.elasticapi.retry | ||
| import org.opensearch.alerting.elasticapi.suspendUntil | ||
| import org.opensearch.alerting.model.ActionExecutionResult | ||
| import org.opensearch.alerting.model.ActionRunResult | ||
| import org.opensearch.alerting.model.AggregationResultBucket | ||
| import org.opensearch.alerting.model.Alert | ||
| import org.opensearch.alerting.model.BucketLevelTrigger | ||
| import org.opensearch.alerting.model.Monitor | ||
| import org.opensearch.alerting.model.QueryLevelTriggerRunResult | ||
| import org.opensearch.alerting.model.Trigger | ||
| import org.opensearch.alerting.model.TriggerRunResult | ||
| import org.opensearch.alerting.script.TriggerExecutionContext | ||
| import org.opensearch.alerting.model.action.AlertCategory | ||
| import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext | ||
| import org.opensearch.alerting.util.IndexUtils | ||
| import org.opensearch.alerting.util.getBucketKeysHash | ||
| import org.opensearch.client.Client | ||
| import org.opensearch.common.bytes.BytesReference | ||
| import org.opensearch.common.xcontent.LoggingDeprecationHandler | ||
|
|
@@ -57,29 +62,47 @@ class AlertService( | |
|
|
||
| private val logger = LogManager.getLogger(AlertService::class.java) | ||
|
|
||
| suspend fun loadCurrentAlerts(monitor: Monitor): Map<Trigger, Alert?> { | ||
| val request = SearchRequest(AlertIndices.ALERT_INDEX) | ||
| .routing(monitor.id) | ||
| .source(alertQuery(monitor)) | ||
| val response: SearchResponse = client.suspendUntil { client.search(request, it) } | ||
| if (response.status() != RestStatus.OK) { | ||
| throw (response.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) | ||
| } | ||
| suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor): Map<Trigger, Alert?> { | ||
| val searchAlertsResponse: SearchResponse = searchAlerts( | ||
| monitorId = monitor.id, | ||
| size = monitor.triggers.size * 2 // We expect there to be only a single in-progress alert so fetch 2 to check | ||
| ) | ||
|
|
||
| val foundAlerts = response.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } | ||
| val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } | ||
| .groupBy { it.triggerId } | ||
| foundAlerts.values.forEach { alerts -> | ||
| if (alerts.size > 1) { | ||
| logger.warn("Found multiple alerts for same trigger: $alerts") | ||
| } | ||
| } | ||
|
|
||
| return monitor.triggers.associate { trigger -> | ||
| trigger to (foundAlerts[trigger.id]?.firstOrNull()) | ||
| return monitor.triggers.associateWith { trigger -> | ||
| foundAlerts[trigger.id]?.firstOrNull() | ||
| } | ||
| } | ||
|
|
||
| suspend fun loadCurrentAlertsForBucketLevelMonitor(monitor: Monitor): Map<Trigger, MutableMap<String, Alert>> { | ||
| val searchAlertsResponse: SearchResponse = searchAlerts( | ||
| monitorId = monitor.id, | ||
| size = 500 // TODO: This should be a constant and limited based on the circuit breaker that limits Alerts | ||
| ) | ||
|
|
||
| val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) } | ||
| .groupBy { it.triggerId } | ||
|
|
||
| return monitor.triggers.associateWith { trigger -> | ||
| // Default to an empty map if there are no Alerts found for a Trigger to make Alert categorization logic easier | ||
| (foundAlerts[trigger.id]?.mapNotNull { alert -> | ||
| alert.aggregationResultBucket?.let { it.getBucketKeysHash() to alert } | ||
| }?.toMap() ?: mutableMapOf()) as MutableMap<String, Alert> | ||
| } | ||
| } | ||
|
|
||
| fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? { | ||
| fun composeQueryLevelAlert( | ||
| ctx: QueryLevelTriggerExecutionContext, | ||
| result: QueryLevelTriggerRunResult, | ||
| alertError: AlertError? | ||
| ): Alert? { | ||
| val currentTime = Instant.now() | ||
| val currentAlert = ctx.alert | ||
|
|
||
|
|
@@ -100,11 +123,11 @@ class AlertService( | |
| } | ||
| } | ||
| // add action execution results which not exist in current alert | ||
| updatedActionExecutionResults.addAll(result.actionResults.filter { it -> !currentActionIds.contains(it.key) } | ||
| .map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) | ||
| updatedActionExecutionResults.addAll(result.actionResults.filter { !currentActionIds.contains(it.key) } | ||
| .map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) | ||
| } else { | ||
| updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime, | ||
| if (it.value.throttled) 1 else 0) }) | ||
| updatedActionExecutionResults.addAll(result.actionResults.map { | ||
| ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }) | ||
| } | ||
|
|
||
| // Merge the alert's error message to the current alert's history | ||
|
|
@@ -129,6 +152,92 @@ class AlertService( | |
| } | ||
| } | ||
|
|
||
| fun updateActionResultsForBucketLevelAlert( | ||
| currentAlert: Alert, | ||
| actionResults: Map<String, ActionRunResult>, | ||
| alertError: AlertError? | ||
| ): Alert { | ||
| val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>() | ||
| val currentActionIds = mutableSetOf<String>() | ||
| // Update alert's existing action execution results | ||
| for (actionExecutionResult in currentAlert.actionExecutionResults) { | ||
| val actionId = actionExecutionResult.actionId | ||
| currentActionIds.add(actionId) | ||
| val actionRunResult = actionResults[actionId] | ||
| when { | ||
| actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult) | ||
| actionRunResult.throttled -> | ||
| updatedActionExecutionResults.add(actionExecutionResult.copy( | ||
| throttledCount = actionExecutionResult.throttledCount + 1)) | ||
| else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime)) | ||
| } | ||
| } | ||
|
|
||
| // Add action execution results not currently present in the alert | ||
| updatedActionExecutionResults.addAll( | ||
| actionResults.filter { !currentActionIds.contains(it.key) } | ||
| .map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) } | ||
| ) | ||
|
|
||
| val updatedErrorHistory = currentAlert.errorHistory.update(alertError) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we always load full error history? can this be optimized as it can be occupy a big chunk of heap. Another possible optimization which can be added later if you think its a problem.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, looks like errorHistory is capped to |
||
| return if (alertError == null) { | ||
| currentAlert.copy(errorHistory = updatedErrorHistory, actionExecutionResults = updatedActionExecutionResults) | ||
| } else { | ||
| currentAlert.copy( | ||
| state = Alert.State.ERROR, | ||
| errorMessage = alertError.message, | ||
| errorHistory = updatedErrorHistory, | ||
| actionExecutionResults = updatedActionExecutionResults | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // TODO: Can change the parameters to use ctx: BucketLevelTriggerExecutionContext instead of monitor/trigger and | ||
| // result: AggTriggerRunResult for aggResultBuckets | ||
| // TODO: Can refactor this method to use Sets instead which can cleanup some of the categorization logic (like getting completed alerts) | ||
| fun getCategorizedAlertsForBucketLevelMonitor( | ||
|
qreshi marked this conversation as resolved.
|
||
| monitor: Monitor, | ||
| trigger: BucketLevelTrigger, | ||
| currentAlerts: MutableMap<String, Alert>, | ||
| aggResultBuckets: List<AggregationResultBucket> | ||
| ): Map<AlertCategory, List<Alert>> { | ||
| val dedupedAlerts = mutableListOf<Alert>() | ||
| val newAlerts = mutableListOf<Alert>() | ||
| val currentTime = Instant.now() | ||
|
|
||
| aggResultBuckets.forEach { aggAlertBucket -> | ||
| val currentAlert = currentAlerts[aggAlertBucket.getBucketKeysHash()] | ||
| if (currentAlert != null) { | ||
| // De-duped Alert | ||
| dedupedAlerts.add(currentAlert.copy(lastNotificationTime = currentTime, aggregationResultBucket = aggAlertBucket)) | ||
|
qreshi marked this conversation as resolved.
|
||
|
|
||
| // Remove de-duped Alert from currentAlerts since it is no longer a candidate for a potentially completed Alert | ||
| currentAlerts.remove(aggAlertBucket.getBucketKeysHash()) | ||
| } else { | ||
| // New Alert | ||
| // TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here | ||
| val newAlert = Alert(monitor = monitor, trigger = trigger, startTime = currentTime, | ||
| lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null, | ||
| errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(), | ||
| schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket) | ||
| newAlerts.add(newAlert) | ||
| } | ||
| } | ||
|
|
||
| return mapOf( | ||
| AlertCategory.DEDUPED to dedupedAlerts, | ||
| AlertCategory.NEW to newAlerts | ||
| ) | ||
| } | ||
|
|
||
| fun convertToCompletedAlerts(currentAlerts: Map<String, Alert>?): List<Alert> { | ||
| val currentTime = Instant.now() | ||
| return currentAlerts?.map { | ||
| it.value.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null, | ||
| schemaVersion = IndexUtils.alertIndexSchemaVersion) | ||
| } ?: listOf() | ||
| } | ||
|
|
||
| suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy) { | ||
| var requestsToRetry = alerts.flatMap { alert -> | ||
| // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. | ||
|
|
@@ -177,17 +286,98 @@ class AlertService( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * This is a separate method created specifically for saving new Alerts during the Bucket-Level Monitor run. | ||
| * Alerts are saved in two batches during the execution of an Bucket-Level Monitor, once before the Actions are executed | ||
| * and once afterwards. This method saves Alerts to the [AlertIndices.ALERT_INDEX] but returns the same Alerts with their document IDs. | ||
| * | ||
| * The Alerts are required with their indexed ID so that when the new Alerts are updated after the Action execution, | ||
| * the ID is available for the index request so that the existing Alert can be updated, instead of creating a duplicate Alert document. | ||
| */ | ||
| suspend fun saveNewAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy): List<Alert> { | ||
| val savedAlerts = mutableListOf<Alert>() | ||
| var alertsBeingIndexed = alerts | ||
| var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert -> | ||
| if (alert.state != Alert.State.ACTIVE) { | ||
| throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]") | ||
| } | ||
| if (alert.id != Alert.NO_ID) { | ||
| throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]") | ||
| } | ||
| IndexRequest(AlertIndices.ALERT_INDEX) | ||
| .routing(alert.monitorId) | ||
| .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
| }.toMutableList() | ||
|
|
||
| if (requestsToRetry.isEmpty()) return listOf() | ||
|
|
||
| // Retry Bulk requests if there was any 429 response. | ||
| // The responses of a bulk request will be in the same order as the individual requests. | ||
| // If the index request succeeded for an Alert, the document ID from the response is taken and saved in the Alert. | ||
| // If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in | ||
| // relation to index request in the retried bulk request for when it eventually succeeds. | ||
| retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { | ||
| val bulkRequest = BulkRequest().add(requestsToRetry) | ||
| val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } | ||
| // TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below | ||
| val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } | ||
|
|
||
| requestsToRetry = mutableListOf() | ||
| val alertsBeingRetried = mutableListOf<Alert>() | ||
| bulkResponse.items.forEach { item -> | ||
| if (item.isFailed) { | ||
| // TODO: What if the failure cause was not TOO_MANY_REQUESTS, should these be saved and logged? | ||
| if (item.status() == RestStatus.TOO_MANY_REQUESTS) { | ||
| requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest) | ||
| alertsBeingRetried.add(alertsBeingIndexed[item.itemId]) | ||
| } | ||
| } else { | ||
| // The ID of the BulkItemResponse in this case is the document ID resulting from the DocWriteRequest operation | ||
| savedAlerts.add(alertsBeingIndexed[item.itemId].copy(id = item.id)) | ||
| } | ||
| } | ||
|
|
||
| alertsBeingIndexed = alertsBeingRetried | ||
|
|
||
| if (requestsToRetry.isNotEmpty()) { | ||
| val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause | ||
| throw ExceptionsHelper.convertToOpenSearchException(retryCause) | ||
| } | ||
| } | ||
|
|
||
| return savedAlerts | ||
| } | ||
|
|
||
| private fun contentParser(bytesReference: BytesReference): XContentParser { | ||
| val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, | ||
| bytesReference, XContentType.JSON) | ||
| XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) | ||
| return xcp | ||
| } | ||
|
|
||
| private fun alertQuery(monitor: Monitor): SearchSourceBuilder { | ||
| return SearchSourceBuilder.searchSource() | ||
| .size(monitor.triggers.size * 2) // We expect there to be only a single in-progress alert so fetch 2 to check | ||
| .query(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id)) | ||
| /** | ||
| * Searches for Alerts in the [AlertIndices.ALERT_INDEX]. | ||
| * | ||
| * @param monitorId The Monitor to get Alerts for | ||
| * @param size The number of search hits (Alerts) to return | ||
| */ | ||
| private suspend fun searchAlerts(monitorId: String, size: Int): SearchResponse { | ||
| val queryBuilder = QueryBuilders.boolQuery() | ||
| .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) | ||
|
|
||
| val searchSourceBuilder = SearchSourceBuilder() | ||
| .size(size) | ||
| .query(queryBuilder) | ||
|
|
||
| val searchRequest = SearchRequest(AlertIndices.ALERT_INDEX) | ||
| .routing(monitorId) | ||
| .source(searchSourceBuilder) | ||
| val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } | ||
| if (searchResponse.status() != RestStatus.OK) { | ||
| throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) | ||
| } | ||
|
|
||
| return searchResponse | ||
| } | ||
|
|
||
| private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this constant defined here looks ugly, move somewhere else if it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This 500 size could be too big and consume big chunk of heap if individual bucket size is big. Will loading alerts in paginated way help here?
We have a requirement in
MonitorRunnerto load all alerts and then dedupe any existing ones. Can this process of dedupe/categorization be done in a paginated way? If we load all current alerts in an ordered way and also, paginate and keep discarding old pages to categorize the new alerts, wouldn't that help in limiting the memory here?We don't have to fix it now, but something we can think about and probably create an issue for future optimization if you think its feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the reason this was a harcoded value and not defined as a constant was because it was arbitrary for the time being. We should discuss with @elfisher and see where we want to draw the cutoff.
The problem is that this size is limiting the Alerts that are being fetched at the Monitor level whereas any circuit breakers we add might make more sense at the Trigger level (since that's where we de-dupe). Imposing a circuit breaker in general to the max Alert count will require some refactoring in the MonitorRunner logic since currently it is indexing new/de-duped Alerts as it paginates composite agg results (so we wouldn't be able to circuit break halfway without having partial results unless we removed that behavior).
For the time being, I'll move this value of
500to a constant variable and once all changes are in main, at the very least I'll add awarnlog that this value has been exceeded at the Monitor level. If time permits, let's discuss a circuit breaker option (and if we are willing to fail the Monitor here). Either way, we will be creating an issue for this if it is not part of the initial release.