diff --git a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt index 7051e798..c166a014 100644 --- a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt +++ b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt @@ -9,6 +9,7 @@ import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.delete.DeleteRequest @@ -21,6 +22,7 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue @@ -46,11 +48,13 @@ import org.opensearch.notifications.model.NotificationConfigDoc import org.opensearch.notifications.model.NotificationConfigDocInfo import org.opensearch.notifications.settings.PluginSettings import org.opensearch.notifications.util.SecureIndexClient +import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntil import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntilTimeout import org.opensearch.script.Script import org.opensearch.search.SearchHit import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder +import java.lang.IllegalArgumentException import java.util.concurrent.TimeUnit /** @@ -58,11 +62,22 @@ import java.util.concurrent.TimeUnit */ @Suppress("TooManyFunctions") internal object NotificationConfigIndex : ConfigOperations { + const val DEFAULT_SCHEMA_VERSION = 1 + const val _META = "_meta" + const val SCHEMA_VERSION = "schema_version" + private val log by logger(NotificationConfigIndex::class.java) private const val INDEX_NAME = ".opensearch-notifications-config" private const val MAPPING_FILE_NAME = "notifications-config-mapping.yml" private const val SETTINGS_FILE_NAME = "notifications-config-settings.yml" + private val indexMappingAsMap = XContentHelper.convertToMap( + XContentType.YAML.xContent(), + NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!, + false + ) + private val indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap) + private lateinit var client: Client private lateinit var clusterService: ClusterService @@ -92,6 +107,21 @@ internal object NotificationConfigIndex : ConfigOperations { NotificationConfigIndex.clusterService = clusterService } + private fun getSchemaVersionFromIndexMapping(indexMapping: Map?): Int { + var schemaVersion = DEFAULT_SCHEMA_VERSION + if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is Map<*, *>) { + val metaData = indexMapping[_META] as Map<*, *> + if (metaData.containsKey(SCHEMA_VERSION)) { + try { + schemaVersion = metaData[SCHEMA_VERSION] as Int + } catch (e: Exception) { + throw IllegalArgumentException("schema_version can only be Integer") + } + } + } + return schemaVersion + } + /** * Create index using the mapping and settings defined in resource */ @@ -99,8 +129,6 @@ internal object NotificationConfigIndex : ConfigOperations { private suspend fun createIndex() { if (!isIndexExists()) { val classLoader = NotificationConfigIndex::class.java.classLoader - val indexMappingSource = classLoader.getResource(MAPPING_FILE_NAME)?.readText()!! - val indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false) val indexSettingsSource = classLoader.getResource(SETTINGS_FILE_NAME)?.readText()!! val request = CreateIndexRequest(INDEX_NAME) .mapping(indexMappingAsMap) @@ -121,6 +149,22 @@ internal object NotificationConfigIndex : ConfigOperations { } } } + } else { + val currentIndexMappingMetadata = clusterService.state().metadata.indices[INDEX_NAME]?.mapping()?.sourceAsMap() + val currentIndexMappingSchemaVersion = getSchemaVersionFromIndexMapping(currentIndexMappingMetadata) + if (currentIndexMappingSchemaVersion < indexMappingSchemaVersion) { + val putMappingRequest: PutMappingRequest = PutMappingRequest(INDEX_NAME).source(indexMappingAsMap) + client.threadPool().threadContext.stashContext().use { + val response: AcknowledgedResponse = client.suspendUntil { + admin().indices().putMapping(putMappingRequest, it) + } + if (response.isAcknowledged) { + log.info("$LOG_PREFIX:Index $INDEX_NAME update mapping Acknowledged") + } else { + throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME update mapping not Acknowledged") + } + } + } } } diff --git a/notifications/notifications/src/main/resources/notifications-config-mapping.yml b/notifications/notifications/src/main/resources/notifications-config-mapping.yml index 0b2f68fa..133fc28c 100644 --- a/notifications/notifications/src/main/resources/notifications-config-mapping.yml +++ b/notifications/notifications/src/main/resources/notifications-config-mapping.yml @@ -7,6 +7,8 @@ # Schema file for the notifications-config index # "dynamic" is set to "false" so that only specified fields are indexed instead of all fields. dynamic: false +_meta: + schema_version: 1 properties: metadata: type: object diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt index 2940c1a5..f3a4b3db 100644 --- a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt @@ -25,6 +25,7 @@ import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.core.xcontent.MediaType import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.notifications.NotificationPlugin +import org.opensearch.notifications.index.NotificationConfigIndex import org.opensearch.rest.RestRequest import org.opensearch.test.rest.OpenSearchRestTestCase import java.io.IOException @@ -364,6 +365,15 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() { updateClusterSettings(ClusterSetting("transient", "*", null)) } + protected fun getCurrentMappingsSchemaVersion(): Int { + val indexName = ".opensearch-notifications-config" + val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings") + val response = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObject = response.get(indexName).asJsonObject.get("mappings").asJsonObject + return mappingsObject.get(NotificationConfigIndex._META)?.asJsonObject?.get(NotificationConfigIndex.SCHEMA_VERSION)?.asInt + ?: NotificationConfigIndex.DEFAULT_SCHEMA_VERSION + } + protected class ClusterSetting(val type: String, val name: String, var value: Any?) { init { this.value = if (value == null) "null" else "\"" + value + "\"" diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt index b7797697..50e5baa9 100644 --- a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt @@ -6,6 +6,9 @@ package org.opensearch.integtest.config import org.junit.Assert +import org.opensearch.client.Request +import org.opensearch.client.ResponseException +import org.opensearch.client.WarningFailureException import org.opensearch.commons.notifications.model.Chime import org.opensearch.commons.notifications.model.ConfigType import org.opensearch.commons.notifications.model.MethodType @@ -16,6 +19,7 @@ import org.opensearch.commons.notifications.model.Webhook import org.opensearch.core.rest.RestStatus import org.opensearch.integtest.PluginRestTestCase import org.opensearch.notifications.NotificationPlugin.Companion.PLUGIN_BASE_URI +import org.opensearch.notifications.index.NotificationConfigIndex import org.opensearch.notifications.verifySingleConfigEquals import org.opensearch.rest.RestRequest @@ -217,4 +221,116 @@ class CreateNotificationConfigIT : PluginRestTestCase() { RestStatus.BAD_REQUEST.status ) } + + fun `test automatically update mappings if encountering higher schema_version`() { + val indexName = ".opensearch-notifications-config" + val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName) + try { + adminClient().performRequest(deleteIndexRequest) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } + + val pseudoMappingString = """ + { + "mappings": { + "_meta":{ + "schema_version":0 + }, + "dynamic": "false", + "properties": { + "config": { + "properties": { + "chime": { + "properties": { + "url": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + } + } + } + """.trimIndent() + try { + executeRequest( + RestRequest.Method.PUT.name, + indexName, + pseudoMappingString, + RestStatus.OK.status + ) + } catch (e: Exception) { + /* ignore warnings */ + assert(e is WarningFailureException) + } + + Assert.assertEquals(0, getCurrentMappingsSchemaVersion()) + createConfig() + Assert.assertEquals(1, getCurrentMappingsSchemaVersion()) + } + + fun `test _meta field not exists in current mappings`() { + val indexName = ".opensearch-notifications-config" + val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName) + try { + adminClient().performRequest(deleteIndexRequest) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } + + val pseudoMappingString = """ + { + "mappings": { + "dynamic": "false", + "properties": { + "config": { + "properties": { + "chime": { + "properties": { + "url": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + } + } + } + """.trimIndent() + try { + executeRequest( + RestRequest.Method.PUT.name, + indexName, + pseudoMappingString, + RestStatus.OK.status + ) + } catch (e: Exception) { + /* ignore warnings */ + assert(e is WarningFailureException) + } + + val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings") + val responseBefore = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObjectBefore = responseBefore.get(indexName).asJsonObject.get("mappings").asJsonObject + Assert.assertNull("mappings should not have _meta field", mappingsObjectBefore.get(NotificationConfigIndex._META)) + createConfig() + val responseAfter = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObjectAfter = responseAfter.get(indexName).asJsonObject.get("mappings").asJsonObject + Assert.assertEquals(mappingsObjectAfter, mappingsObjectBefore) + } } diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt b/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt new file mode 100644 index 00000000..6cb8c60a --- /dev/null +++ b/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.notifications.index + +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import java.lang.reflect.Method +import kotlin.test.assertEquals +import kotlin.test.assertFails + +class NotificationConfigIndexTests { + + @Test + fun `test get schema version`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema_version" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(10, schemaVersion, "wrong schema version") + } + + @Test + fun `test get schema version without _meta field`() { + val indexMapping = mapOf( + "meta" to mapOf("schema_version" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(1, schemaVersion, "wrong schema version") + } + + @Test + fun `test get schema version without schema_version field`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(1, schemaVersion, "wrong schema version") + } + + @Test + fun `test get non number schema_version throw exception`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema_version" to "10"), + "user" to "test" + ) + assertFails { + getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + } + } + + companion object { + private lateinit var getSchemaVersionFromIndexMapping: Method + + @BeforeAll + @JvmStatic + fun initialize() { + /* use reflection to get private method */ + getSchemaVersionFromIndexMapping = NotificationConfigIndex::class.java.getDeclaredMethod( + "getSchemaVersionFromIndexMapping", Map::class.java + ) + + getSchemaVersionFromIndexMapping.isAccessible = true + } + } +}