Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.QueryLevelTrigger
Expand Down Expand Up @@ -366,15 +367,24 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val triggerResults = mutableMapOf<String, BucketLevelTriggerRunResult>()
val triggerContexts = mutableMapOf<String, BucketLevelTriggerExecutionContext>()
val nextAlerts = mutableMapOf<String, MutableMap<AlertCategory, MutableList<Alert>>>()
var firstIteration = true
var firstPageOfInputResults = InputRunResults(listOf(), null)
do {
// TODO: Since a composite aggregation is being used for the input query, the total bucket count cannot be determined.
// If a setting is imposed that limits buckets that can be processed for Bucket-Level Monitors, we'd need to iterate over
// the buckets until we hit that threshold. In that case, we'd want to exit the execution without creating any alerts since the
// buckets we iterate over before hitting the limit is not deterministic. Is there a better way to fail faster in this case?
runBlocking(InjectorContextElement(monitor.id, settings, threadPool.threadContext, roles)) {
monitorResult = monitorResult.copy(
inputResults = inputService.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults)
)
// Storing the first page of results in the case of pagination input results to prevent empty results
// in the final output of monitorResult which occurs when all pages have been exhausted.
// If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths
// with different page counts.
val inputResults = inputService.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults)
if (firstIteration) {
firstPageOfInputResults = inputResults
firstIteration = false
}
monitorResult = monitorResult.copy(inputResults = inputResults)
}

for (trigger in monitor.triggers) {
Expand Down Expand Up @@ -528,7 +538,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
}

return monitorResult.copy(triggerResults = triggerResults)
return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private fun getRolesForMonitor(monitor: Monitor): List<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ class MonitorRunnerIT : AlertingRestTestCase() {
// The last page (when after_key is null) is empty if all the contents fit on the previous page, meaning the
// input results returned by the monitor execution is empty.
// Skipping this test for now until this is resolved to show a non-empty result.
fun `skip test execute bucket-level monitor returns search result`() {
fun `test execute bucket-level monitor returns search result`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
Expand Down