Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ dependencies {
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
Expand Down Expand Up @@ -186,8 +187,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e)
val alertingException = AlertingException.wrap(e)
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
RestStatus.INTERNAL_SERVER_ERROR,
e
)
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}

Expand Down Expand Up @@ -390,25 +395,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

suspend fun createRunContext(
clusterService: ClusterService,
client: Client,
index: String,
createdRecently: Boolean = false
): HashMap<String, Any> {
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = getShardsCount(clusterService, index)
lastRunContext["shards_count"] = count

for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = if (createdRecently) -1L else getMaxSeqNo(client, index, shard)
lastRunContext[shard] = maxSeqNo
}
return lastRunContext
}

// Checks if the index was created from the last execution run or when the monitor was last updated to ensure that
// new index is monitored from the beginning of that index
private fun createdRecently(
Expand Down Expand Up @@ -564,15 +550,46 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

var xContentBuilder = XContentFactory.jsonBuilder().startObject()
sourceMap.forEach { (k, v) ->
xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v)
}
xContentBuilder = xContentBuilder.endObject()
transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

val sourceRef = BytesReference.bytes(xContentBuilder)

logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString())

Pair(hit.id, sourceRef)
}
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
* "a": { "a": {
* "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234
* } }
* }
*
* @param jsonAsMap Input JSON (as Map)
* @param fieldNameSuffix Field suffix which is appended to existing field name
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
}
}
jsonAsMap.putAll(tempMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
Expand Down Expand Up @@ -209,6 +210,117 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 8 queries", 8, findings[0].docLevelQueries.size)
}

fun `test execute monitor with non-flattened json doc as source`() {
val docQuery1 = DocLevelQuery(query = "source.device.port:12345 OR source.device.hwd.id:12345", name = "3")

val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

val mappings = """{
"properties": {
"source.device.port": { "type": "long" },
"source.device.hwd.id": { "type": "long" },
"nested_field": {
"type": "nested",
"properties": {
"test1": {
"type": "keyword"
}
}
},
"my_join_field": {
"type": "join",
"relations": {
"question": "answer"
}
}
}
}"""

client().admin().indices().putMapping(PutMappingRequest(index).source(mappings, XContentType.JSON)).get()
val getFieldCapabilitiesResp = client().fieldCaps(FieldCapabilitiesRequest().indices(index).fields("*")).get()
assertTrue(getFieldCapabilitiesResp.getField("source").containsKey("object"))
assertTrue(getFieldCapabilitiesResp.getField("source.device").containsKey("object"))
assertTrue(getFieldCapabilitiesResp.getField("source.device.hwd").containsKey("object"))
// testing both, nested and flatten documents
val testDocuments = mutableListOf<String>()
testDocuments += """{
"source" : { "device": {"port" : 12345 } },
"nested_field": { "test1": "some text" }
}"""
testDocuments += """{
"source.device.port" : "12345"
}"""
testDocuments += """{
"source.device.port" : 12345
}"""
testDocuments += """{
"source" : { "device": {"hwd": { "id": 12345 } } }
}"""
testDocuments += """{
"source.device.hwd.id" : 12345
}"""
// Document with join field
testDocuments += """{
"source" : { "device" : { "hwd": { "id" : 12345 } } },
"my_join_field": { "name": "question" }
}"""
// Checking if these pointless but valid documents cause any issues
testDocuments += """{
"source" : {}
}"""
testDocuments += """{
"source.device" : null
}"""
testDocuments += """{
"source.device" : {}
}"""
testDocuments += """{
"source.device.hwd" : {}
}"""
testDocuments += """{
"source.device.hwd.id" : null
}"""
testDocuments += """{
"some.multi.val.field" : [12345, 10, 11]
}"""
// Insert all documents
for (i in testDocuments.indices) {
indexDoc(index, "$i", testDocuments[i])
}
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 6, findings.size)
assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index old`() {
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.index.reindex.ReindexPlugin
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.join.ParentJoinPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -229,7 +230,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
).get()

override fun getPlugins(): List<Class<out Plugin>> {
return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java)
return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java, ParentJoinPlugin::class.java)
}

override fun resetNodeAfterTest(): Boolean {
Expand Down