diff --git a/alerting/build.gradle b/alerting/build.gradle index e562221f8..ba2189b54 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -113,6 +113,7 @@ dependencies { testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.mockito:mockito-core:${versions.mockito}" testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}" + testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index acd3ded15..47cfc8879 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse @@ -186,8 +187,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) } catch (e: Exception) { - logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e) - val alertingException = AlertingException.wrap(e) + logger.error("Failed to start Document-level-monitor ${monitor.name}", e) + val alertingException = AlertingException( + ExceptionsHelper.unwrapCause(e).cause?.message.toString(), + RestStatus.INTERNAL_SERVER_ERROR, + e + ) monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } @@ -390,25 +395,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } - suspend fun createRunContext( - clusterService: ClusterService, - client: Client, - index: String, - createdRecently: Boolean = false - ): HashMap { - val lastRunContext = HashMap() - lastRunContext["index"] = index - val count = getShardsCount(clusterService, index) - lastRunContext["shards_count"] = count - - for (i: Int in 0 until count) { - val shard = i.toString() - val maxSeqNo: Long = if (createdRecently) -1L else getMaxSeqNo(client, index, shard) - lastRunContext[shard] = maxSeqNo - } - return lastRunContext - } - // Checks if the index was created from the last execution run or when the monitor was last updated to ensure that // new index is monitored from the beginning of that index private fun createdRecently( @@ -564,15 +550,46 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return hits.map { hit -> val sourceMap = hit.sourceAsMap - var xContentBuilder = XContentFactory.jsonBuilder().startObject() - sourceMap.forEach { (k, v) -> - xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v) - } - xContentBuilder = xContentBuilder.endObject() + transformDocumentFieldNames(sourceMap, "_${index}_$monitorId") + + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) + logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) + Pair(hit.id, sourceRef) } } + + /** + * Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names. + * + * Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV: + * { { + * "a": { "a": { + * "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234 + * } } + * } + * + * @param jsonAsMap Input JSON (as Map) + * @param fieldNameSuffix Field suffix which is appended to existing field name + */ + private fun transformDocumentFieldNames( + jsonAsMap: MutableMap, + fieldNameSuffix: String + ) { + val tempMap = mutableMapOf() + val it: MutableIterator> = jsonAsMap.entries.iterator() + while (it.hasNext()) { + val entry = it.next() + if (entry.value is Map<*, *>) { + transformDocumentFieldNames(entry.value as MutableMap, fieldNameSuffix) + } else if (entry.key.endsWith(fieldNameSuffix) == false) { + tempMap["${entry.key}$fieldNameSuffix"] = entry.value + it.remove() + } + } + jsonAsMap.putAll(tempMap) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index aa8d50fab..13a22f657 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -14,6 +14,7 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.action.SearchMonitorAction @@ -209,6 +210,117 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match all 8 queries", 8, findings[0].docLevelQueries.size) } + fun `test execute monitor with non-flattened json doc as source`() { + val docQuery1 = DocLevelQuery(query = "source.device.port:12345 OR source.device.hwd.id:12345", name = "3") + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + + val mappings = """{ + "properties": { + "source.device.port": { "type": "long" }, + "source.device.hwd.id": { "type": "long" }, + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + }, + "my_join_field": { + "type": "join", + "relations": { + "question": "answer" + } + } + } + }""" + + client().admin().indices().putMapping(PutMappingRequest(index).source(mappings, XContentType.JSON)).get() + val getFieldCapabilitiesResp = client().fieldCaps(FieldCapabilitiesRequest().indices(index).fields("*")).get() + assertTrue(getFieldCapabilitiesResp.getField("source").containsKey("object")) + assertTrue(getFieldCapabilitiesResp.getField("source.device").containsKey("object")) + assertTrue(getFieldCapabilitiesResp.getField("source.device.hwd").containsKey("object")) + // testing both, nested and flatten documents + val testDocuments = mutableListOf() + testDocuments += """{ + "source" : { "device": {"port" : 12345 } }, + "nested_field": { "test1": "some text" } + }""" + testDocuments += """{ + "source.device.port" : "12345" + }""" + testDocuments += """{ + "source.device.port" : 12345 + }""" + testDocuments += """{ + "source" : { "device": {"hwd": { "id": 12345 } } } + }""" + testDocuments += """{ + "source.device.hwd.id" : 12345 + }""" + // Document with join field + testDocuments += """{ + "source" : { "device" : { "hwd": { "id" : 12345 } } }, + "my_join_field": { "name": "question" } + }""" + // Checking if these pointless but valid documents cause any issues + testDocuments += """{ + "source" : {} + }""" + testDocuments += """{ + "source.device" : null + }""" + testDocuments += """{ + "source.device" : {} + }""" + testDocuments += """{ + "source.device.hwd" : {} + }""" + testDocuments += """{ + "source.device.hwd.id" : null + }""" + testDocuments += """{ + "some.multi.val.field" : [12345, 10, 11] + }""" + // Insert all documents + for (i in testDocuments.indices) { + indexDoc(index, "$i", testDocuments[i]) + } + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 6, findings.size) + assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) + } + fun `test execute monitor with custom query index old`() { val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index a57e8fa33..7bfbcf4ac 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -38,6 +38,7 @@ import org.opensearch.commons.alerting.model.Table import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.reindex.ReindexPlugin import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.join.ParentJoinPlugin import org.opensearch.plugins.Plugin import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder @@ -229,7 +230,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { ).get() override fun getPlugins(): List> { - return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java) + return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java, ParentJoinPlugin::class.java) } override fun resetNodeAfterTest(): Boolean {