@@ -9,12 +9,15 @@ import org.apache.logging.log4j.LogManager
99import org.opensearch.ExceptionsHelper
1010import org.opensearch.OpenSearchStatusException
1111import org.opensearch.action.ActionListener
12+ import org.opensearch.action.DocWriteRequest
13+ import org.opensearch.action.admin.indices.refresh.RefreshAction
14+ import org.opensearch.action.admin.indices.refresh.RefreshRequest
15+ import org.opensearch.action.bulk.BulkRequest
16+ import org.opensearch.action.bulk.BulkResponse
1217import org.opensearch.action.index.IndexRequest
13- import org.opensearch.action.index.IndexResponse
1418import org.opensearch.action.search.SearchAction
1519import org.opensearch.action.search.SearchRequest
1620import org.opensearch.action.search.SearchResponse
17- import org.opensearch.action.support.WriteRequest
1821import org.opensearch.alerting.model.DocumentExecutionContext
1922import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2023import org.opensearch.alerting.model.InputRunResults
@@ -264,10 +267,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
264267 // If there are no triggers defined, we still want to generate findings
265268 if (monitor.triggers.isEmpty()) {
266269 if (dryrun == false && monitor.id != Monitor .NO_ID ) {
267- docsToQueries.forEach {
268- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
269- createFindings(monitor, monitorCtx, triggeredQueries, it.key, true )
270- }
270+ createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true )
271271 }
272272 } else {
273273 monitor.triggers.forEach {
@@ -349,35 +349,39 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
349349 trigger : DocumentLevelTrigger ,
350350 monitor : Monitor ,
351351 idQueryMap : Map <String , DocLevelQuery >,
352- docsToQueries : Map <String , List <String >>,
352+ docsToQueries : MutableMap <String , MutableList <String >>,
353353 queryToDocIds : Map <DocLevelQuery , Set <String >>,
354354 dryrun : Boolean
355355 ): DocumentLevelTriggerRunResult {
356356 val triggerCtx = DocumentLevelTriggerExecutionContext (monitor, trigger)
357357 val triggerResult = monitorCtx.triggerService!! .runDocLevelTrigger(monitor, trigger, queryToDocIds)
358358
359- val findings = mutableListOf<String >()
360- val findingDocPairs = mutableListOf<Pair <String , String >>()
359+ val triggerFindingDocPairs = mutableListOf<Pair <String , String >>()
361360
362361 // TODO: Implement throttling for findings
363- docsToQueries.forEach {
364- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
365- val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, ! dryrun && monitor.id != Monitor .NO_ID )
366- findings.add(findingId)
362+ val findingToDocPairs = createFindings(
363+ monitor,
364+ monitorCtx,
365+ docsToQueries,
366+ idQueryMap,
367+ ! dryrun && monitor.id != Monitor .NO_ID ,
368+ )
367369
368- if (triggerResult.triggeredDocs.contains(it.key)) {
369- findingDocPairs.add(Pair (findingId, it.key))
370+ findingToDocPairs.forEach {
371+ // Only pick those entries whose docs have triggers associated with them
372+ if (triggerResult.triggeredDocs.contains(it.second)) {
373+ triggerFindingDocPairs.add(Pair (it.first, it.second))
370374 }
371375 }
372376
373377 val actionCtx = triggerCtx.copy(
374378 triggeredDocs = triggerResult.triggeredDocs,
375- relatedFindings = findings ,
379+ relatedFindings = findingToDocPairs.map { it.first } ,
376380 error = monitorResult.error ? : triggerResult.error
377381 )
378382
379383 val alerts = mutableListOf<Alert >()
380- findingDocPairs .forEach {
384+ triggerFindingDocPairs .forEach {
381385 val alert = monitorCtx.alertService!! .composeDocLevelAlert(
382386 listOf (it.first),
383387 listOf (it.second),
@@ -427,49 +431,90 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
427431 return triggerResult
428432 }
429433
434+ /* *
435+ * 1. Bulk index all findings based on shouldCreateFinding flag
436+ * 2. invoke publishFinding() to kickstart auto-correlations
437+ * 3. Returns a list of pairs for finding id to doc id
438+ */
430439 private suspend fun createFindings (
431440 monitor : Monitor ,
432441 monitorCtx : MonitorRunnerExecutionContext ,
433- docLevelQueries : List <DocLevelQuery >,
434- matchingDocId : String ,
435- shouldCreateFinding : Boolean
436- ): String {
437- // Before the "|" is the doc id and after the "|" is the index
438- val docIndex = matchingDocId.split(" |" )
442+ docsToQueries : MutableMap <String , MutableList <String >>,
443+ idQueryMap : Map <String , DocLevelQuery >,
444+ shouldCreateFinding : Boolean ,
445+ ): List <Pair <String , String >> {
439446
440- val finding = Finding (
441- id = UUID .randomUUID().toString(),
442- relatedDocIds = listOf (docIndex[0 ]),
443- correlatedDocIds = listOf (docIndex[0 ]),
444- monitorId = monitor.id,
445- monitorName = monitor.name,
446- index = docIndex[1 ],
447- docLevelQueries = docLevelQueries,
448- timestamp = Instant .now()
449- )
447+ val findingDocPairs = mutableListOf<Pair <String , String >>()
448+ val findings = mutableListOf<Finding >()
449+ val indexRequests = mutableListOf<IndexRequest >()
450450
451- val findingStr = finding.toXContent( XContentBuilder .builder( XContentType . JSON .xContent()), ToXContent . EMPTY_PARAMS ).string()
452- logger.debug( " Findings: $findingStr " )
451+ docsToQueries.forEach {
452+ val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId] !! }
453453
454- if (shouldCreateFinding) {
455- val indexRequest = IndexRequest (monitor.dataSources.findingsIndex)
456- .setRefreshPolicy(WriteRequest .RefreshPolicy .IMMEDIATE )
457- .source(findingStr, XContentType .JSON )
458- .id(finding.id)
459- .routing(finding.id)
454+ // Before the "|" is the doc id and after the "|" is the index
455+ val docIndex = it.key.split(" |" )
460456
461- monitorCtx.client!! .suspendUntil<Client , IndexResponse > {
462- monitorCtx.client!! .index(indexRequest, it)
457+ val finding = Finding (
458+ id = UUID .randomUUID().toString(),
459+ relatedDocIds = listOf (docIndex[0 ]),
460+ correlatedDocIds = listOf (docIndex[0 ]),
461+ monitorId = monitor.id,
462+ monitorName = monitor.name,
463+ index = docIndex[1 ],
464+ docLevelQueries = triggeredQueries,
465+ timestamp = Instant .now(),
466+ )
467+ findingDocPairs.add(Pair (finding.id, it.key))
468+ findings.add(finding)
469+
470+ val findingStr =
471+ finding.toXContent(XContentBuilder .builder(XContentType .JSON .xContent()), ToXContent .EMPTY_PARAMS )
472+ .string()
473+ logger.debug(" Findings: $findingStr " )
474+
475+ if (shouldCreateFinding) {
476+ indexRequests + = IndexRequest (monitor.dataSources.findingsIndex)
477+ .source(findingStr, XContentType .JSON )
478+ .id(finding.id)
479+ .opType(DocWriteRequest .OpType .CREATE )
463480 }
464481 }
465482
483+ if (indexRequests.isNotEmpty()) {
484+ bulkIndexFindings(monitor, monitorCtx, indexRequests)
485+ }
486+
466487 try {
467- publishFinding(monitor, monitorCtx, finding)
488+ findings.forEach { finding ->
489+ publishFinding(monitor, monitorCtx, finding)
490+ }
468491 } catch (e: Exception ) {
469492 // suppress exception
470493 logger.error(" Optional finding callback failed" , e)
471494 }
472- return finding.id
495+ return findingDocPairs
496+ }
497+
498+ private suspend fun bulkIndexFindings (
499+ monitor : Monitor ,
500+ monitorCtx : MonitorRunnerExecutionContext ,
501+ indexRequests : List <IndexRequest >
502+ ) {
503+ indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
504+ val bulkResponse: BulkResponse = monitorCtx.client!! .suspendUntil {
505+ bulk(BulkRequest ().add(batch), it)
506+ }
507+ if (bulkResponse.hasFailures()) {
508+ bulkResponse.items.forEach { item ->
509+ if (item.isFailed) {
510+ logger.error(" Failed indexing the finding ${item.id} of monitor [${monitor.id} ]" )
511+ }
512+ }
513+ } else {
514+ logger.debug(" [${bulkResponse.items.size} ] All findings successfully indexed." )
515+ }
516+ }
517+ monitorCtx.client!! .execute(RefreshAction .INSTANCE , RefreshRequest (monitor.dataSources.findingsIndex))
473518 }
474519
475520 private fun publishFinding (
@@ -592,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
592637 matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
593638 }
594639 } catch (e: Exception ) {
595- logger.warn (" Failed to run for shard $shard . Error: ${e.message} " )
640+ logger.error (" Failed to run for shard $shard . Error: ${e.message} " )
596641 }
597642 }
598643 return matchingDocs
0 commit comments