-
Notifications
You must be signed in to change notification settings - Fork 119
Bulk index findings and sequentially invoke auto-correlations #1355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
e7ba6d7
28530dd
03b86ae
ae32748
1833d6d
3bd7888
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,8 +8,10 @@ package org.opensearch.alerting | |
| import org.apache.logging.log4j.LogManager | ||
| import org.opensearch.ExceptionsHelper | ||
| import org.opensearch.OpenSearchStatusException | ||
| import org.opensearch.action.DocWriteRequest | ||
| import org.opensearch.action.bulk.BulkRequest | ||
| import org.opensearch.action.bulk.BulkResponse | ||
| import org.opensearch.action.index.IndexRequest | ||
| import org.opensearch.action.index.IndexResponse | ||
| import org.opensearch.action.search.SearchAction | ||
| import org.opensearch.action.search.SearchRequest | ||
| import org.opensearch.action.search.SearchResponse | ||
|
|
@@ -22,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult | |
| import org.opensearch.alerting.model.userErrorMessage | ||
| import org.opensearch.alerting.opensearchapi.suspendUntil | ||
| import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext | ||
| import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE | ||
| import org.opensearch.alerting.util.AlertingException | ||
| import org.opensearch.alerting.util.IndexUtils | ||
| import org.opensearch.alerting.util.defaultToPerExecutionAction | ||
|
|
@@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
| // If there are no triggers defined, we still want to generate findings | ||
| if (monitor.triggers.isEmpty()) { | ||
| if (dryrun == false && monitor.id != Monitor.NO_ID) { | ||
| docsToQueries.forEach { | ||
| val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
| createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) | ||
| } | ||
| createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) | ||
| } | ||
| } else { | ||
| monitor.triggers.forEach { | ||
|
|
@@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
| trigger: DocumentLevelTrigger, | ||
| monitor: Monitor, | ||
| idQueryMap: Map<String, DocLevelQuery>, | ||
| docsToQueries: Map<String, List<String>>, | ||
| docsToQueries: MutableMap<String, MutableList<String>>, | ||
| queryToDocIds: Map<DocLevelQuery, Set<String>>, | ||
| dryrun: Boolean, | ||
| workflowRunContext: WorkflowRunContext?, | ||
|
|
@@ -374,35 +374,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
| val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) | ||
| val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) | ||
|
|
||
| val findings = mutableListOf<String>() | ||
| val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
| val triggerFindingDocPairs = mutableListOf<Pair<String, String>>() | ||
|
|
||
| // TODO: Implement throttling for findings | ||
| docsToQueries.forEach { | ||
| val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
| val findingId = createFindings( | ||
| monitor, | ||
| monitorCtx, | ||
| triggeredQueries, | ||
| it.key, | ||
| !dryrun && monitor.id != Monitor.NO_ID, | ||
| executionId | ||
| ) | ||
| findings.add(findingId) | ||
| val findingToDocPairs = createFindings( | ||
| monitor, | ||
| monitorCtx, | ||
| docsToQueries, | ||
| idQueryMap, | ||
| !dryrun && monitor.id != Monitor.NO_ID, | ||
| executionId | ||
| ) | ||
|
|
||
| if (triggerResult.triggeredDocs.contains(it.key)) { | ||
| findingDocPairs.add(Pair(findingId, it.key)) | ||
| findingToDocPairs.forEach { | ||
| // Only pick those entries whose docs have triggers associated with them | ||
| if (triggerResult.triggeredDocs.contains(it.second)) { | ||
| triggerFindingDocPairs.add(Pair(it.first, it.second)) | ||
| } | ||
| } | ||
|
|
||
| val actionCtx = triggerCtx.copy( | ||
| triggeredDocs = triggerResult.triggeredDocs, | ||
| relatedFindings = findings, | ||
| // confirm if this is right or only trigger-able findings should be present in this list | ||
|
||
| relatedFindings = findingToDocPairs.map { it.first }, | ||
| error = monitorResult.error ?: triggerResult.error | ||
| ) | ||
|
|
||
| val alerts = mutableListOf<Alert>() | ||
| findingDocPairs.forEach { | ||
| triggerFindingDocPairs.forEach { | ||
| val alert = monitorCtx.alertService!!.composeDocLevelAlert( | ||
| listOf(it.first), | ||
| listOf(it.second), | ||
|
|
@@ -461,51 +460,102 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
| return triggerResult | ||
| } | ||
|
|
||
| /** | ||
| * 1. Bulk index all findings based on shouldCreateFinding flag | ||
| * 2. invoke publishFinding() to kickstart auto-correlations | ||
| * 3. Returns a list of pairs for finding id to doc id | ||
| */ | ||
| private suspend fun createFindings( | ||
| monitor: Monitor, | ||
| monitorCtx: MonitorRunnerExecutionContext, | ||
| docLevelQueries: List<DocLevelQuery>, | ||
| matchingDocId: String, | ||
| docsToQueries: MutableMap<String, MutableList<String>>, | ||
| idQueryMap: Map<String, DocLevelQuery>, | ||
| shouldCreateFinding: Boolean, | ||
| workflowExecutionId: String? = null, | ||
| ): String { | ||
| // Before the "|" is the doc id and after the "|" is the index | ||
| val docIndex = matchingDocId.split("|") | ||
| ): List<Pair<String, String>> { | ||
|
|
||
| val finding = Finding( | ||
| id = UUID.randomUUID().toString(), | ||
| relatedDocIds = listOf(docIndex[0]), | ||
| correlatedDocIds = listOf(docIndex[0]), | ||
| monitorId = monitor.id, | ||
| monitorName = monitor.name, | ||
| index = docIndex[1], | ||
| docLevelQueries = docLevelQueries, | ||
| timestamp = Instant.now(), | ||
| executionId = workflowExecutionId | ||
| ) | ||
| val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
| val findings = mutableListOf<Finding>() | ||
| val indexRequests = mutableListOf<IndexRequest>() | ||
| monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) | ||
|
||
| monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) { | ||
|
||
| monitorCtx.findingsIndexBatchSize = it | ||
| } | ||
|
|
||
| val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
| logger.debug("Findings: $findingStr") | ||
| docsToQueries.forEach { | ||
| val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
|
|
||
| if (shouldCreateFinding) { | ||
| val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) | ||
| .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
| .source(findingStr, XContentType.JSON) | ||
| .id(finding.id) | ||
| .routing(finding.id) | ||
| // Before the "|" is the doc id and after the "|" is the index | ||
| val docIndex = it.key.split("|") | ||
|
|
||
| monitorCtx.client!!.suspendUntil<Client, IndexResponse> { | ||
| monitorCtx.client!!.index(indexRequest, it) | ||
| val finding = Finding( | ||
| id = UUID.randomUUID().toString(), | ||
| relatedDocIds = listOf(docIndex[0]), | ||
| correlatedDocIds = listOf(docIndex[0]), | ||
| monitorId = monitor.id, | ||
| monitorName = monitor.name, | ||
| index = docIndex[1], | ||
| docLevelQueries = triggeredQueries, | ||
| timestamp = Instant.now(), | ||
| executionId = workflowExecutionId | ||
| ) | ||
| findingDocPairs.add(Pair(finding.id, it.key)) | ||
| findings.add(finding) | ||
|
|
||
| val findingStr = | ||
| finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) | ||
| .string() | ||
| logger.debug("Findings: $findingStr") | ||
|
|
||
| if (indexRequests.size > monitorCtx.findingsIndexBatchSize) { | ||
| // make bulk indexing call here and flush the indexRequest object | ||
| bulkIndexFindings(monitor, monitorCtx, indexRequests) | ||
| indexRequests.clear() | ||
| } else { | ||
| if (shouldCreateFinding) { | ||
| indexRequests += IndexRequest(monitor.dataSources.findingsIndex) | ||
| .source(findingStr, XContentType.JSON) | ||
| .id(finding.id) | ||
| .routing(finding.id) | ||
| .opType(DocWriteRequest.OpType.INDEX) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) { | ||
| bulkIndexFindings(monitor, monitorCtx, indexRequests) | ||
| } | ||
|
|
||
| try { | ||
| publishFinding(monitor, monitorCtx, finding) | ||
| findings.forEach { finding -> | ||
| publishFinding(monitor, monitorCtx, finding) | ||
| } | ||
| } catch (e: Exception) { | ||
| // suppress exception | ||
| logger.error("Optional finding callback failed", e) | ||
| } | ||
| return finding.id | ||
| return findingDocPairs | ||
| } | ||
|
|
||
| private suspend fun bulkIndexFindings( | ||
| monitor: Monitor, | ||
| monitorCtx: MonitorRunnerExecutionContext, | ||
| indexRequests: List<IndexRequest> | ||
| ) { | ||
| if (indexRequests.isNotEmpty()) { | ||
| val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { | ||
goyamegh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) | ||
|
||
| } | ||
| if (bulkResponse.hasFailures()) { | ||
| bulkResponse.items.forEach { item -> | ||
| if (item.isFailed) { | ||
goyamegh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") | ||
| } | ||
| } | ||
| } else { | ||
| logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private fun publishFinding( | ||
|
|
@@ -629,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
| matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) | ||
| } | ||
| } catch (e: Exception) { | ||
| logger.warn("Failed to run for shard $shard. Error: ${e.message}") | ||
| logger.error("Failed to run for shard $shard. Error: ${e.message}") | ||
| } | ||
| } | ||
| return matchingDocs | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ class AlertingSettings { | |
|
|
||
| companion object { | ||
| const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L | ||
| const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000 | ||
goyamegh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| val ALERTING_MAX_MONITORS = Setting.intSetting( | ||
| "plugins.alerting.monitor.max_monitors", | ||
|
|
@@ -153,5 +154,12 @@ class AlertingSettings { | |
| -1L, | ||
| Setting.Property.NodeScope, Setting.Property.Dynamic | ||
| ) | ||
|
|
||
| val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( | ||
| "plugins.alerting.alert_findings_indexing_batch_size", | ||
| DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, | ||
| 0, | ||
|
||
| Setting.Property.NodeScope, Setting.Property.Dynamic | ||
| ) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:BULK