Skip to content

Commit c3dfa9e

Browse files
petardzlezzago
authored andcommitted
Multiple indices support in DocLevelMonitorInput (opensearch-project#784)
Signed-off-by: Petar Dzepina <[email protected]>
1 parent dad275d commit c3dfa9e

File tree

10 files changed

+397
-56
lines changed

10 files changed

+397
-56
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
221221
.registerClusterService(clusterService)
222222
.registerClient(client)
223223
.registerNamedXContentRegistry(xContentRegistry)
224+
.registerindexNameExpressionResolver(indexNameExpressionResolver)
224225
.registerScriptService(scriptService)
225226
.registerSettings(settings)
226227
.registerThreadPool(threadPool)

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ package org.opensearch.alerting
77

88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.OpenSearchStatusException
10-
import org.opensearch.action.admin.indices.get.GetIndexRequest
11-
import org.opensearch.action.admin.indices.get.GetIndexResponse
1210
import org.opensearch.action.index.IndexRequest
1311
import org.opensearch.action.index.IndexResponse
1412
import org.opensearch.action.search.SearchAction
@@ -34,9 +32,11 @@ import org.opensearch.alerting.opensearchapi.string
3432
import org.opensearch.alerting.opensearchapi.suspendUntil
3533
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
3634
import org.opensearch.alerting.util.AlertingException
35+
import org.opensearch.alerting.util.IndexUtils
3736
import org.opensearch.alerting.util.defaultToPerExecutionAction
3837
import org.opensearch.alerting.util.getActionExecutionPolicy
3938
import org.opensearch.client.Client
39+
import org.opensearch.cluster.metadata.IndexMetadata
4040
import org.opensearch.cluster.routing.ShardRouting
4141
import org.opensearch.cluster.service.ClusterService
4242
import org.opensearch.common.bytes.BytesReference
@@ -53,7 +53,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
5353
import org.opensearch.search.sort.SortOrder
5454
import java.io.IOException
5555
import java.time.Instant
56-
import java.util.UUID
56+
import java.util.*
5757
import kotlin.math.max
5858

5959
object DocumentLevelMonitorRunner : MonitorRunner() {
@@ -94,7 +94,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
9494
)
9595

9696
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
97-
val index = docLevelMonitorInput.indices[0]
97+
9898
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
9999

100100
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
@@ -107,6 +107,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
107107
val docsToQueries = mutableMapOf<String, MutableList<String>>()
108108

109109
try {
110+
// Resolve all passed indices to concrete indices
111+
val indices = IndexUtils.resolveAllIndices(
112+
docLevelMonitorInput.indices,
113+
monitorCtx.clusterService!!,
114+
monitorCtx.indexNameExpressionResolver!!
115+
)
116+
110117
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
111118
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
112119
monitor = monitor,
@@ -115,12 +122,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
115122
indexTimeout = monitorCtx.indexTimeout!!
116123
)
117124

118-
val getIndexRequest = GetIndexRequest().indices(index)
119-
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
120-
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
121-
}
122-
val indices = getIndexResponse.indices()
123-
124125
// cleanup old indices that are not monitored anymore from the same monitor
125126
for (ind in updatedLastRunContext.keys) {
126127
if (!indices.contains(ind)) {
@@ -131,8 +132,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
131132
indices.forEach { indexName ->
132133
// Prepare lastRunContext for each index
133134
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
134-
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
135-
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
135+
val isIndexCreatedRecently = createdRecently(
136+
monitor,
137+
periodStart,
138+
periodEnd,
139+
monitorCtx.clusterService!!.state().metadata.index(indexName)
140+
)
141+
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
136142
}
137143

138144
// Prepare updatedLastRunContext for each index
@@ -182,7 +188,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
182188
}
183189
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
184190
} catch (e: Exception) {
185-
logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e)
191+
logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e)
186192
val alertingException = AlertingException.wrap(e)
187193
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
188194
}
@@ -381,9 +387,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
381387
throw IOException("Invalid input with document-level-monitor.")
382388
}
383389

384-
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
385-
if (docLevelMonitorInput.indices.size > 1) {
386-
throw IOException("Only one index is supported with document-level-monitor.")
390+
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
391+
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
387392
}
388393
}
389394

@@ -410,13 +415,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
410415
// new index is monitored from the beginning of that index
411416
private fun createdRecently(
412417
monitor: Monitor,
413-
index: String,
414418
periodStart: Instant,
415419
periodEnd: Instant,
416-
getIndexResponse: GetIndexResponse
420+
indexMetadata: IndexMetadata
417421
): Boolean {
418422
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
419-
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
423+
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
424+
return indexCreationDate > lastExecutionTime.toEpochMilli()
420425
}
421426

422427
/**

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings
1313
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
1414
import org.opensearch.alerting.util.DocLevelMonitorQueries
1515
import org.opensearch.client.Client
16+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1617
import org.opensearch.cluster.service.ClusterService
1718
import org.opensearch.common.settings.Settings
1819
import org.opensearch.common.unit.TimeValue
@@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext(
2526
var clusterService: ClusterService? = null,
2627
var client: Client? = null,
2728
var xContentRegistry: NamedXContentRegistry? = null,
29+
var indexNameExpressionResolver: IndexNameExpressionResolver? = null,
2830
var scriptService: ScriptService? = null,
2931
var settings: Settings? = null,
3032
var threadPool: ThreadPool? = null,

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries
3636
import org.opensearch.alerting.util.isBucketLevelMonitor
3737
import org.opensearch.alerting.util.isDocLevelMonitor
3838
import org.opensearch.client.Client
39+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
3940
import org.opensearch.cluster.service.ClusterService
4041
import org.opensearch.common.component.AbstractLifecycleComponent
4142
import org.opensearch.common.settings.Settings
@@ -72,6 +73,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
7273
return this
7374
}
7475

76+
fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService {
77+
this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver
78+
return this
79+
}
80+
7581
fun registerScriptService(scriptService: ScriptService): MonitorRunnerService {
7682
this.monitorCtx.scriptService = scriptService
7783
return this

alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.alerting.alerts
77

88
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.ExceptionsHelper
910
import org.opensearch.ResourceAlreadyExistsException
1011
import org.opensearch.action.ActionListener
1112
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
@@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO
3637
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD
3738
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD
3839
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
40+
import org.opensearch.alerting.util.AlertingException
3941
import org.opensearch.alerting.util.IndexUtils
4042
import org.opensearch.client.Client
4143
import org.opensearch.cluster.ClusterChangedEvent
@@ -293,8 +295,12 @@ class AlertIndices(
293295
return try {
294296
val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
295297
createIndexResponse.isAcknowledged
296-
} catch (e: ResourceAlreadyExistsException) {
297-
true
298+
} catch (t: Exception) {
299+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
300+
true
301+
} else {
302+
throw AlertingException.wrap(t)
303+
}
298304
}
299305
}
300306

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
1010
import kotlinx.coroutines.launch
1111
import org.apache.logging.log4j.LogManager
12+
import org.opensearch.ExceptionsHelper
1213
import org.opensearch.OpenSearchException
1314
import org.opensearch.OpenSearchSecurityException
1415
import org.opensearch.OpenSearchStatusException
@@ -260,7 +261,7 @@ class TransportIndexMonitorAction @Inject constructor(
260261
}
261262
override fun onFailure(t: Exception) {
262263
// https://github.com/opensearch-project/alerting/issues/646
263-
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
264+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
264265
scope.launch {
265266
// Wait for the yellow status
266267
val request = ClusterHealthRequest()

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.alias.Alias
1313
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
16-
import org.opensearch.action.admin.indices.get.GetIndexRequest
17-
import org.opensearch.action.admin.indices.get.GetIndexResponse
1816
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1917
import org.opensearch.action.admin.indices.rollover.RolloverRequest
2018
import org.opensearch.action.admin.indices.rollover.RolloverResponse
@@ -26,6 +24,7 @@ import org.opensearch.action.bulk.BulkResponse
2624
import org.opensearch.action.index.IndexRequest
2725
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2826
import org.opensearch.action.support.master.AcknowledgedResponse
27+
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
2928
import org.opensearch.alerting.core.model.DocLevelMonitorInput
3029
import org.opensearch.alerting.core.model.DocLevelQuery
3130
import org.opensearch.alerting.core.model.ScheduledJob
@@ -85,8 +84,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
8584
return try {
8685
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
8786
createIndexResponse.isAcknowledged
88-
} catch (t: ResourceAlreadyExistsException) {
89-
if (t.message?.contains("already exists") == true) {
87+
} catch (t: Exception) {
88+
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
9089
true
9190
} else {
9291
throw t
@@ -161,16 +160,15 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
161160
indexTimeout: TimeValue
162161
) {
163162
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
164-
val index = docLevelMonitorInput.indices[0]
165163
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
166164

167-
val clusterState = clusterService.state()
165+
val indices = IndexUtils.resolveAllIndices(
166+
docLevelMonitorInput.indices,
167+
monitorCtx.clusterService!!,
168+
monitorCtx.indexNameExpressionResolver!!
169+
)
168170

169-
val getIndexRequest = GetIndexRequest().indices(index)
170-
val getIndexResponse: GetIndexResponse = client.suspendUntil {
171-
client.admin().indices().getIndex(getIndexRequest, it)
172-
}
173-
val indices = getIndexResponse.indices()
171+
val clusterState = clusterService.state()
174172

175173
// Run through each backing index and apply appropriate mappings to query index
176174
indices?.forEach { indexName ->
@@ -334,7 +332,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
334332

335333
/**
336334
* Adjusts max field limit index setting for query index if source index has higher limit.
337-
* This will prevent max field limit exception, when applying mappings to query index
335+
* This will prevent max field limit exception, when source index has more fields then query index limit
338336
*/
339337
private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) {
340338
val getSettingsResponse: GetSettingsResponse = client.suspendUntil {

alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,20 @@ package org.opensearch.alerting.util
77

88
import org.opensearch.action.ActionListener
99
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
10+
import org.opensearch.action.support.IndicesOptions
1011
import org.opensearch.action.support.master.AcknowledgedResponse
1112
import org.opensearch.alerting.alerts.AlertIndices
1213
import org.opensearch.alerting.core.ScheduledJobIndices
1314
import org.opensearch.client.IndicesAdminClient
1415
import org.opensearch.cluster.ClusterState
1516
import org.opensearch.cluster.metadata.IndexMetadata
17+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
18+
import org.opensearch.cluster.service.ClusterService
1619
import org.opensearch.common.xcontent.LoggingDeprecationHandler
1720
import org.opensearch.common.xcontent.NamedXContentRegistry
1821
import org.opensearch.common.xcontent.XContentParser
1922
import org.opensearch.common.xcontent.XContentType
23+
import org.opensearch.index.IndexNotFoundException
2024

2125
class IndexUtils {
2226

@@ -130,5 +134,26 @@ class IndexUtils {
130134
}
131135
}
132136
}
137+
138+
@JvmStatic
139+
fun resolveAllIndices(indices: List<String>, clusterService: ClusterService, resolver: IndexNameExpressionResolver): List<String> {
140+
val result = mutableListOf<String>()
141+
142+
indices.forEach { index ->
143+
val concreteIndices = resolver.concreteIndexNames(
144+
clusterService.state(),
145+
IndicesOptions.lenientExpand(),
146+
true,
147+
index
148+
)
149+
result.addAll(concreteIndices)
150+
}
151+
152+
if (result.size == 0) {
153+
throw IndexNotFoundException(indices[0])
154+
}
155+
156+
return result
157+
}
133158
}
134159
}

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
379379

380380
val findings = searchFindings(monitor)
381381
assertEquals("Findings saved for test monitor", 2, findings.size)
382-
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
383-
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
382+
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
383+
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
384384
}
385385

386386
fun `test execute monitor with new index added after first execution that generates alerts and findings`() {
@@ -409,14 +409,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
409409

410410
var findings = searchFindings(monitor)
411411
assertEquals("Findings saved for test monitor", 2, findings.size)
412-
assertTrue(
413-
"Findings saved for test monitor expected 1 instead of ${findings[0].relatedDocIds}",
414-
findings[0].relatedDocIds.contains("1")
415-
)
416-
assertTrue(
417-
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
418-
findings[1].relatedDocIds.contains("5")
419-
)
412+
413+
var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
414+
assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size)
420415

421416
// clear previous findings and alerts
422417
deleteIndex(ALL_FINDING_INDEX_PATTERN)
@@ -444,18 +439,11 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
444439

445440
findings = searchFindings(monitor)
446441
assertEquals("Findings saved for test monitor", 3, findings.size)
447-
assertTrue(
448-
"Findings saved for test monitor expected 14 instead of ${findings[0].relatedDocIds}",
449-
findings[0].relatedDocIds.contains("14")
450-
)
451-
assertTrue(
452-
"Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}",
453-
findings[1].relatedDocIds.contains("51")
454-
)
455-
assertTrue(
456-
"Findings saved for test monitor expected 10 instead of ${findings[2].relatedDocIds}",
457-
findings[2].relatedDocIds.contains("10")
458-
)
442+
443+
foundFindings = findings.filter {
444+
it.relatedDocIds.contains("14") || it.relatedDocIds.contains("51") || it.relatedDocIds.contains("10")
445+
}
446+
assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size)
459447
}
460448

461449
fun `test document-level monitor when alias only has write index with 0 docs`() {

0 commit comments

Comments
 (0)