Skip to content

Commit 96154ba

Browse files
Multiple indices support in DocLevelMonitorInput (opensearch-project#784) (opensearch-project#808)
Signed-off-by: Petar Dzepina <[email protected]>
1 parent a263907 commit 96154ba

File tree

10 files changed

+303
-59
lines changed

10 files changed

+303
-59
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
217217
.registerClusterService(clusterService)
218218
.registerClient(client)
219219
.registerNamedXContentRegistry(xContentRegistry)
220+
.registerindexNameExpressionResolver(indexNameExpressionResolver)
220221
.registerScriptService(scriptService)
221222
.registerSettings(settings)
222223
.registerThreadPool(threadPool)

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package org.opensearch.alerting
88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
1010
import org.opensearch.OpenSearchStatusException
11-
import org.opensearch.action.admin.indices.get.GetIndexRequest
12-
import org.opensearch.action.admin.indices.get.GetIndexResponse
1311
import org.opensearch.action.index.IndexRequest
1412
import org.opensearch.action.index.IndexResponse
1513
import org.opensearch.action.search.SearchAction
@@ -24,9 +22,11 @@ import org.opensearch.alerting.model.MonitorRunResult
2422
import org.opensearch.alerting.opensearchapi.suspendUntil
2523
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
2624
import org.opensearch.alerting.util.AlertingException
25+
import org.opensearch.alerting.util.IndexUtils
2726
import org.opensearch.alerting.util.defaultToPerExecutionAction
2827
import org.opensearch.alerting.util.getActionExecutionPolicy
2928
import org.opensearch.client.Client
29+
import org.opensearch.cluster.metadata.IndexMetadata
3030
import org.opensearch.cluster.routing.ShardRouting
3131
import org.opensearch.cluster.service.ClusterService
3232
import org.opensearch.common.bytes.BytesReference
@@ -98,7 +98,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
9898
)
9999

100100
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
101-
val index = docLevelMonitorInput.indices[0]
101+
102102
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
103103

104104
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
@@ -111,6 +111,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
111111
val docsToQueries = mutableMapOf<String, MutableList<String>>()
112112

113113
try {
114+
// Resolve all passed indices to concrete indices
115+
val indices = IndexUtils.resolveAllIndices(
116+
docLevelMonitorInput.indices,
117+
monitorCtx.clusterService!!,
118+
monitorCtx.indexNameExpressionResolver!!
119+
)
120+
114121
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
115122
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
116123
monitor = monitor,
@@ -119,12 +126,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
119126
indexTimeout = monitorCtx.indexTimeout!!
120127
)
121128

122-
val getIndexRequest = GetIndexRequest().indices(index)
123-
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
124-
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
125-
}
126-
val indices = getIndexResponse.indices()
127-
128129
// cleanup old indices that are not monitored anymore from the same monitor
129130
for (ind in updatedLastRunContext.keys) {
130131
if (!indices.contains(ind)) {
@@ -135,8 +136,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
135136
indices.forEach { indexName ->
136137
// Prepare lastRunContext for each index
137138
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
138-
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
139-
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
139+
val isIndexCreatedRecently = createdRecently(
140+
monitor,
141+
periodStart,
142+
periodEnd,
143+
monitorCtx.clusterService!!.state().metadata.index(indexName)
144+
)
145+
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
140146
}
141147

142148
// Prepare updatedLastRunContext for each index
@@ -389,9 +395,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
389395
throw IOException("Invalid input with document-level-monitor.")
390396
}
391397

392-
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
393-
if (docLevelMonitorInput.indices.size > 1) {
394-
throw IOException("Only one index is supported with document-level-monitor.")
398+
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
399+
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
395400
}
396401
}
397402

@@ -418,13 +423,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
418423
// new index is monitored from the beginning of that index
419424
private fun createdRecently(
420425
monitor: Monitor,
421-
index: String,
422426
periodStart: Instant,
423427
periodEnd: Instant,
424-
getIndexResponse: GetIndexResponse
428+
indexMetadata: IndexMetadata
425429
): Boolean {
426430
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
427-
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
431+
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
432+
return indexCreationDate > lastExecutionTime.toEpochMilli()
428433
}
429434

430435
/**

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings
1313
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
1414
import org.opensearch.alerting.util.DocLevelMonitorQueries
1515
import org.opensearch.client.Client
16+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1617
import org.opensearch.cluster.service.ClusterService
1718
import org.opensearch.common.settings.Settings
1819
import org.opensearch.common.unit.TimeValue
@@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext(
2526
var clusterService: ClusterService? = null,
2627
var client: Client? = null,
2728
var xContentRegistry: NamedXContentRegistry? = null,
29+
var indexNameExpressionResolver: IndexNameExpressionResolver? = null,
2830
var scriptService: ScriptService? = null,
2931
var settings: Settings? = null,
3032
var threadPool: ThreadPool? = null,

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries
3535
import org.opensearch.alerting.util.IndexUtils
3636
import org.opensearch.alerting.util.isDocLevelMonitor
3737
import org.opensearch.client.Client
38+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
3839
import org.opensearch.cluster.service.ClusterService
3940
import org.opensearch.common.component.AbstractLifecycleComponent
4041
import org.opensearch.common.settings.Settings
@@ -75,6 +76,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
7576
return this
7677
}
7778

79+
fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService {
80+
this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver
81+
return this
82+
}
83+
7884
fun registerScriptService(scriptService: ScriptService): MonitorRunnerService {
7985
this.monitorCtx.scriptService = scriptService
8086
return this

alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.alerting.alerts
77

88
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.ExceptionsHelper
910
import org.opensearch.ResourceAlreadyExistsException
1011
import org.opensearch.action.ActionListener
1112
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
@@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO
3637
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD
3738
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD
3839
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
40+
import org.opensearch.alerting.util.AlertingException
3941
import org.opensearch.alerting.util.IndexUtils
4042
import org.opensearch.client.Client
4143
import org.opensearch.cluster.ClusterChangedEvent
@@ -357,8 +359,12 @@ class AlertIndices(
357359
return try {
358360
val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
359361
createIndexResponse.isAcknowledged
360-
} catch (e: ResourceAlreadyExistsException) {
361-
true
362+
} catch (t: Exception) {
363+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
364+
true
365+
} else {
366+
throw AlertingException.wrap(t)
367+
}
362368
}
363369
}
364370

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
1010
import kotlinx.coroutines.launch
1111
import org.apache.logging.log4j.LogManager
12+
import org.opensearch.ExceptionsHelper
1213
import org.opensearch.OpenSearchException
1314
import org.opensearch.OpenSearchSecurityException
1415
import org.opensearch.OpenSearchStatusException
@@ -304,7 +305,7 @@ class TransportIndexMonitorAction @Inject constructor(
304305
}
305306
override fun onFailure(t: Exception) {
306307
// https://github.com/opensearch-project/alerting/issues/646
307-
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
308+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
308309
scope.launch {
309310
// Wait for the yellow status
310311
val request = ClusterHealthRequest()

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.alias.Alias
1313
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
16-
import org.opensearch.action.admin.indices.get.GetIndexRequest
17-
import org.opensearch.action.admin.indices.get.GetIndexResponse
1816
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1917
import org.opensearch.action.admin.indices.rollover.RolloverRequest
2018
import org.opensearch.action.admin.indices.rollover.RolloverResponse
@@ -26,6 +24,7 @@ import org.opensearch.action.bulk.BulkResponse
2624
import org.opensearch.action.index.IndexRequest
2725
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2826
import org.opensearch.action.support.master.AcknowledgedResponse
27+
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
2928
import org.opensearch.alerting.model.MonitorMetadata
3029
import org.opensearch.alerting.opensearchapi.suspendUntil
3130
import org.opensearch.client.Client
@@ -86,8 +85,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
8685
return try {
8786
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
8887
createIndexResponse.isAcknowledged
89-
} catch (t: ResourceAlreadyExistsException) {
90-
if (t.message?.contains("already exists") == true) {
88+
} catch (t: Exception) {
89+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
9190
true
9291
} else {
9392
throw t
@@ -107,9 +106,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
107106
admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex), it)
108107
}
109108
if (!acknowledgedResponse.isAcknowledged) {
110-
val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!"
111-
log.error(errorMessage)
112-
throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR))
109+
log.warn("Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!")
113110
}
114111
}
115112
val alias = dataSources.queryIndex
@@ -125,8 +122,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
125122
return try {
126123
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
127124
createIndexResponse.isAcknowledged
128-
} catch (t: ResourceAlreadyExistsException) {
129-
if (t.message?.contains("already exists") == true) {
125+
} catch (t: Exception) {
126+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
130127
true
131128
} else {
132129
throw t
@@ -202,16 +199,15 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
202199
indexTimeout: TimeValue
203200
) {
204201
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
205-
val index = docLevelMonitorInput.indices[0]
206202
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
207203

208-
val clusterState = clusterService.state()
204+
val indices = IndexUtils.resolveAllIndices(
205+
docLevelMonitorInput.indices,
206+
monitorCtx.clusterService!!,
207+
monitorCtx.indexNameExpressionResolver!!
208+
)
209209

210-
val getIndexRequest = GetIndexRequest().indices(index)
211-
val getIndexResponse: GetIndexResponse = client.suspendUntil {
212-
client.admin().indices().getIndex(getIndexRequest, it)
213-
}
214-
val indices = getIndexResponse.indices()
210+
val clusterState = clusterService.state()
215211

216212
// Run through each backing index and apply appropriate mappings to query index
217213
indices?.forEach { indexName ->
@@ -383,7 +379,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
383379

384380
/**
385381
* Adjusts max field limit index setting for query index if source index has higher limit.
386-
* This will prevent max field limit exception, when applying mappings to query index
382+
* This will prevent max field limit exception, when source index has more fields then query index limit
387383
*/
388384
private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) {
389385
val getSettingsResponse: GetSettingsResponse = client.suspendUntil {

alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@ package org.opensearch.alerting.util
77

88
import org.opensearch.action.ActionListener
99
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
10+
import org.opensearch.action.support.IndicesOptions
1011
import org.opensearch.action.support.master.AcknowledgedResponse
1112
import org.opensearch.alerting.alerts.AlertIndices
1213
import org.opensearch.alerting.core.ScheduledJobIndices
1314
import org.opensearch.client.IndicesAdminClient
1415
import org.opensearch.cluster.ClusterState
1516
import org.opensearch.cluster.metadata.IndexMetadata
17+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
18+
import org.opensearch.cluster.service.ClusterService
1619
import org.opensearch.common.xcontent.LoggingDeprecationHandler
1720
import org.opensearch.common.xcontent.NamedXContentRegistry
1821
import org.opensearch.common.xcontent.XContentParser
1922
import org.opensearch.common.xcontent.XContentType
2023
import org.opensearch.commons.alerting.util.IndexUtils
24+
import org.opensearch.index.IndexNotFoundException
2125

2226
class IndexUtils {
2327

@@ -130,5 +134,26 @@ class IndexUtils {
130134
}
131135
}
132136
}
137+
138+
@JvmStatic
139+
fun resolveAllIndices(indices: List<String>, clusterService: ClusterService, resolver: IndexNameExpressionResolver): List<String> {
140+
val result = mutableListOf<String>()
141+
142+
indices.forEach { index ->
143+
val concreteIndices = resolver.concreteIndexNames(
144+
clusterService.state(),
145+
IndicesOptions.lenientExpand(),
146+
true,
147+
index
148+
)
149+
result.addAll(concreteIndices)
150+
}
151+
152+
if (result.size == 0) {
153+
throw IndexNotFoundException(indices[0])
154+
}
155+
156+
return result
157+
}
133158
}
134159
}

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -428,8 +428,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
428428

429429
val findings = searchFindings(monitor)
430430
assertEquals("Findings saved for test monitor", 2, findings.size)
431-
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
432-
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
431+
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
432+
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
433433
}
434434

435435
fun `test execute monitor with new index added after first execution that generates alerts and findings`() {
@@ -458,14 +458,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
458458

459459
var findings = searchFindings(monitor)
460460
assertEquals("Findings saved for test monitor", 2, findings.size)
461-
assertTrue(
462-
"Findings saved for test monitor expected 1 instead of ${findings[0].relatedDocIds}",
463-
findings[0].relatedDocIds.contains("1")
464-
)
465-
assertTrue(
466-
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
467-
findings[1].relatedDocIds.contains("5")
468-
)
461+
462+
var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
463+
assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size)
469464

470465
// clear previous findings and alerts
471466
deleteIndex(ALL_FINDING_INDEX_PATTERN)
@@ -493,18 +488,11 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
493488

494489
findings = searchFindings(monitor)
495490
assertEquals("Findings saved for test monitor", 3, findings.size)
496-
assertTrue(
497-
"Findings saved for test monitor expected 14 instead of ${findings[0].relatedDocIds}",
498-
findings[0].relatedDocIds.contains("14")
499-
)
500-
assertTrue(
501-
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
502-
findings[1].relatedDocIds.contains("51")
503-
)
504-
assertTrue(
505-
"Findings saved for test monitor expected 10 instead of ${findings[2].relatedDocIds}",
506-
findings[2].relatedDocIds.contains("10")
507-
)
491+
492+
foundFindings = findings.filter {
493+
it.relatedDocIds.contains("14") || it.relatedDocIds.contains("51") || it.relatedDocIds.contains("10")
494+
}
495+
assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size)
508496
}
509497

510498
fun `test document-level monitor when alias only has write index with 0 docs`() {

0 commit comments

Comments
 (0)