From 28c403f5da53e937cc5f556b46443f9491606bd4 Mon Sep 17 00:00:00 2001 From: zhichao-aws Date: Mon, 3 Jul 2023 14:51:15 +0800 Subject: [PATCH 1/5] add auto upgrade mapping logic Signed-off-by: zhichao-aws --- .../index/NotificationConfigIndex.kt | 52 +++++++++++++- .../index/NotificationConfigIndexTests.kt | 71 +++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt diff --git a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt index fe76dcf6..372aa878 100644 --- a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt +++ b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt @@ -9,6 +9,7 @@ import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.delete.DeleteRequest @@ -21,6 +22,7 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue @@ -45,12 +47,14 @@ import org.opensearch.notifications.model.NotificationConfigDoc import org.opensearch.notifications.model.NotificationConfigDocInfo import org.opensearch.notifications.settings.PluginSettings import org.opensearch.notifications.util.SecureIndexClient +import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntil import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntilTimeout import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.search.SearchHit import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder +import java.lang.IllegalArgumentException import java.util.concurrent.TimeUnit /** @@ -62,6 +66,9 @@ internal object NotificationConfigIndex : ConfigOperations { private const val INDEX_NAME = ".opensearch-notifications-config" private const val MAPPING_FILE_NAME = "notifications-config-mapping.yml" private const val SETTINGS_FILE_NAME = "notifications-config-settings.yml" + private const val DEFAULT_SCHEMA_VERSION = 1 + private const val _META = "_meta" + private const val SCHEMA_VERSION = "schema_version" private lateinit var client: Client private lateinit var clusterService: ClusterService @@ -84,6 +91,9 @@ internal object NotificationConfigIndex : ConfigOperations { } } + private var indexMappingAsMap: Map? = null + private var indexMappingSchemaVersion: Int = DEFAULT_SCHEMA_VERSION + /** * {@inheritDoc} */ @@ -92,15 +102,37 @@ internal object NotificationConfigIndex : ConfigOperations { NotificationConfigIndex.clusterService = clusterService } + private fun initIndexMappingAndSchemaVersion() { + val indexMappingSource = NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!! + indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false) + indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap) + } + + private fun getSchemaVersionFromIndexMapping(indexMapping: Map?): Int { + var schemaVersion = DEFAULT_SCHEMA_VERSION + if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is Map<*, *>) { + val metaData = indexMapping[_META] as Map<*, *> + if (metaData.containsKey(SCHEMA_VERSION)) { + try { + schemaVersion = metaData[SCHEMA_VERSION] as Int + } catch (e: Exception) { + throw IllegalArgumentException("schema_version can only be Integer") + } + } + } + return schemaVersion + } + /** * Create index using the mapping and settings defined in resource */ @Suppress("TooGenericExceptionCaught") private suspend fun createIndex() { + if (indexMappingAsMap == null) { + initIndexMappingAndSchemaVersion() + } if (!isIndexExists()) { val classLoader = NotificationConfigIndex::class.java.classLoader - val indexMappingSource = classLoader.getResource(MAPPING_FILE_NAME)?.readText()!! - val indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false) val indexSettingsSource = classLoader.getResource(SETTINGS_FILE_NAME)?.readText()!! val request = CreateIndexRequest(INDEX_NAME) .mapping(indexMappingAsMap) @@ -121,6 +153,22 @@ internal object NotificationConfigIndex : ConfigOperations { } } } + } else { + val currentIndexMappingMetadata = clusterService.state().metadata.indices[INDEX_NAME]?.mapping()?.sourceAsMap() + val currentIndexMappingSchemaVersion = getSchemaVersionFromIndexMapping(currentIndexMappingMetadata) + if (currentIndexMappingSchemaVersion < indexMappingSchemaVersion) { + val putMappingRequest: PutMappingRequest = PutMappingRequest(INDEX_NAME).source(indexMappingAsMap) + client.threadPool().threadContext.stashContext().use { + val response: AcknowledgedResponse = client.suspendUntil { + admin().indices().putMapping(putMappingRequest, it) + } + if (response.isAcknowledged) { + log.info("$LOG_PREFIX:Index $INDEX_NAME update mapping Acknowledged") + } else { + throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME update mapping not Acknowledged") + } + } + } } } diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt b/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt new file mode 100644 index 00000000..6cb8c60a --- /dev/null +++ b/notifications/notifications/src/test/kotlin/org/opensearch/notifications/index/NotificationConfigIndexTests.kt @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.notifications.index + +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import java.lang.reflect.Method +import kotlin.test.assertEquals +import kotlin.test.assertFails + +class NotificationConfigIndexTests { + + @Test + fun `test get schema version`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema_version" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(10, schemaVersion, "wrong schema version") + } + + @Test + fun `test get schema version without _meta field`() { + val indexMapping = mapOf( + "meta" to mapOf("schema_version" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(1, schemaVersion, "wrong schema version") + } + + @Test + fun `test get schema version without schema_version field`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema" to 10), + "user" to "test" + ) + val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + assertEquals(1, schemaVersion, "wrong schema version") + } + + @Test + fun `test get non number schema_version throw exception`() { + val indexMapping = mapOf( + "_meta" to mapOf("schema_version" to "10"), + "user" to "test" + ) + assertFails { + getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping) + } + } + + companion object { + private lateinit var getSchemaVersionFromIndexMapping: Method + + @BeforeAll + @JvmStatic + fun initialize() { + /* use reflection to get private method */ + getSchemaVersionFromIndexMapping = NotificationConfigIndex::class.java.getDeclaredMethod( + "getSchemaVersionFromIndexMapping", Map::class.java + ) + + getSchemaVersionFromIndexMapping.isAccessible = true + } + } +} From 02e1e76ca1d8c25f47273e89947d2d5108d537de Mon Sep 17 00:00:00 2001 From: zhichao-aws Date: Mon, 3 Jul 2023 17:10:20 +0800 Subject: [PATCH 2/5] put load mapping to initialize step Signed-off-by: zhichao-aws --- .../index/NotificationConfigIndex.kt | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt index 372aa878..83c2e6cd 100644 --- a/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt +++ b/notifications/notifications/src/main/kotlin/org/opensearch/notifications/index/NotificationConfigIndex.kt @@ -62,13 +62,21 @@ import java.util.concurrent.TimeUnit */ @Suppress("TooManyFunctions") internal object NotificationConfigIndex : ConfigOperations { + const val DEFAULT_SCHEMA_VERSION = 1 + const val _META = "_meta" + const val SCHEMA_VERSION = "schema_version" + private val log by logger(NotificationConfigIndex::class.java) private const val INDEX_NAME = ".opensearch-notifications-config" private const val MAPPING_FILE_NAME = "notifications-config-mapping.yml" private const val SETTINGS_FILE_NAME = "notifications-config-settings.yml" - private const val DEFAULT_SCHEMA_VERSION = 1 - private const val _META = "_meta" - private const val SCHEMA_VERSION = "schema_version" + + private val indexMappingAsMap = XContentHelper.convertToMap( + XContentType.YAML.xContent(), + NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!, + false + ) + private val indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap) private lateinit var client: Client private lateinit var clusterService: ClusterService @@ -91,9 +99,6 @@ internal object NotificationConfigIndex : ConfigOperations { } } - private var indexMappingAsMap: Map? = null - private var indexMappingSchemaVersion: Int = DEFAULT_SCHEMA_VERSION - /** * {@inheritDoc} */ @@ -102,12 +107,6 @@ internal object NotificationConfigIndex : ConfigOperations { NotificationConfigIndex.clusterService = clusterService } - private fun initIndexMappingAndSchemaVersion() { - val indexMappingSource = NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!! - indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false) - indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap) - } - private fun getSchemaVersionFromIndexMapping(indexMapping: Map?): Int { var schemaVersion = DEFAULT_SCHEMA_VERSION if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is Map<*, *>) { @@ -128,9 +127,6 @@ internal object NotificationConfigIndex : ConfigOperations { */ @Suppress("TooGenericExceptionCaught") private suspend fun createIndex() { - if (indexMappingAsMap == null) { - initIndexMappingAndSchemaVersion() - } if (!isIndexExists()) { val classLoader = NotificationConfigIndex::class.java.classLoader val indexSettingsSource = classLoader.getResource(SETTINGS_FILE_NAME)?.readText()!! From 5ff36e3cab2c2c33d647bedf935149df7c49e6b5 Mon Sep 17 00:00:00 2001 From: zhichao-aws Date: Mon, 3 Jul 2023 17:10:52 +0800 Subject: [PATCH 3/5] add schema_version field Signed-off-by: zhichao-aws --- .../src/main/resources/notifications-config-mapping.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/notifications/notifications/src/main/resources/notifications-config-mapping.yml b/notifications/notifications/src/main/resources/notifications-config-mapping.yml index 0b2f68fa..133fc28c 100644 --- a/notifications/notifications/src/main/resources/notifications-config-mapping.yml +++ b/notifications/notifications/src/main/resources/notifications-config-mapping.yml @@ -7,6 +7,8 @@ # Schema file for the notifications-config index # "dynamic" is set to "false" so that only specified fields are indexed instead of all fields. dynamic: false +_meta: + schema_version: 1 properties: metadata: type: object From fe478b29162f4304b698739eafcd6a350e3c3bd8 Mon Sep 17 00:00:00 2001 From: zhichao-aws Date: Mon, 3 Jul 2023 17:11:19 +0800 Subject: [PATCH 4/5] add integ test Signed-off-by: zhichao-aws --- .../integtest/PluginRestTestCase.kt | 10 ++++ .../config/CreateNotificationConfigIT.kt | 58 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt index cfad76df..08a7a33f 100644 --- a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/PluginRestTestCase.kt @@ -24,6 +24,7 @@ import org.opensearch.commons.rest.SecureRestClientBuilder import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.notifications.NotificationPlugin +import org.opensearch.notifications.index.NotificationConfigIndex import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.test.rest.OpenSearchRestTestCase @@ -370,6 +371,15 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() { updateClusterSettings(ClusterSetting("transient", "*", null)) } + protected fun getCurrentMappingsSchemaVersion(): Int { + val indexName = ".opensearch-notifications-config" + val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings") + val response = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObject = response.get(indexName).asJsonObject.get("mappings").asJsonObject + return mappingsObject.get(NotificationConfigIndex._META)?.asJsonObject?.get(NotificationConfigIndex.SCHEMA_VERSION)?.asInt + ?: NotificationConfigIndex.DEFAULT_SCHEMA_VERSION + } + protected class ClusterSetting(val type: String, val name: String, var value: Any?) { init { this.value = if (value == null) "null" else "\"" + value + "\"" diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt index 2d86dfe6..a010c152 100644 --- a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt @@ -6,6 +6,9 @@ package org.opensearch.integtest.config import org.junit.Assert +import org.opensearch.client.Request +import org.opensearch.client.ResponseException +import org.opensearch.client.WarningFailureException import org.opensearch.commons.notifications.model.Chime import org.opensearch.commons.notifications.model.ConfigType import org.opensearch.commons.notifications.model.MethodType @@ -217,4 +220,59 @@ class CreateNotificationConfigIT : PluginRestTestCase() { RestStatus.BAD_REQUEST.status ) } + + fun `test automatically update mappings if encountering higher schema_version`() { + val indexName = ".opensearch-notifications-config" + val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName) + try { + adminClient().performRequest(deleteIndexRequest) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } + + val pseudoMappingString = """ + { + "mappings": { + "_meta":{ + "schema_version":0 + }, + "dynamic": "false", + "properties": { + "config": { + "properties": { + "chime": { + "properties": { + "url": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + } + } + } + """.trimIndent() + try { + executeRequest( + RestRequest.Method.PUT.name, + indexName, + pseudoMappingString, + RestStatus.OK.status + ) + } catch (e: Exception) { + /* ignore warnings */ + assert(e is WarningFailureException) + } + + Assert.assertEquals(0, getCurrentMappingsSchemaVersion()) + createConfig() + Assert.assertEquals(1, getCurrentMappingsSchemaVersion()) + } } From db2133510642699963a2baf544f40fc77b52154c Mon Sep 17 00:00:00 2001 From: zhichao-aws Date: Mon, 7 Aug 2023 14:28:45 +0800 Subject: [PATCH 5/5] add integ test for lacking _meta field Signed-off-by: zhichao-aws --- .../config/CreateNotificationConfigIT.kt | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt index 839035ea..50e5baa9 100644 --- a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/config/CreateNotificationConfigIT.kt @@ -19,6 +19,7 @@ import org.opensearch.commons.notifications.model.Webhook import org.opensearch.core.rest.RestStatus import org.opensearch.integtest.PluginRestTestCase import org.opensearch.notifications.NotificationPlugin.Companion.PLUGIN_BASE_URI +import org.opensearch.notifications.index.NotificationConfigIndex import org.opensearch.notifications.verifySingleConfigEquals import org.opensearch.rest.RestRequest @@ -275,4 +276,61 @@ class CreateNotificationConfigIT : PluginRestTestCase() { createConfig() Assert.assertEquals(1, getCurrentMappingsSchemaVersion()) } + + fun `test _meta field not exists in current mappings`() { + val indexName = ".opensearch-notifications-config" + val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName) + try { + adminClient().performRequest(deleteIndexRequest) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } + + val pseudoMappingString = """ + { + "mappings": { + "dynamic": "false", + "properties": { + "config": { + "properties": { + "chime": { + "properties": { + "url": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } + } + } + } + """.trimIndent() + try { + executeRequest( + RestRequest.Method.PUT.name, + indexName, + pseudoMappingString, + RestStatus.OK.status + ) + } catch (e: Exception) { + /* ignore warnings */ + assert(e is WarningFailureException) + } + + val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings") + val responseBefore = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObjectBefore = responseBefore.get(indexName).asJsonObject.get("mappings").asJsonObject + Assert.assertNull("mappings should not have _meta field", mappingsObjectBefore.get(NotificationConfigIndex._META)) + createConfig() + val responseAfter = executeRequest(getMappingRequest, RestStatus.OK.status, client()) + val mappingsObjectAfter = responseAfter.get(indexName).asJsonObject.get("mappings").asJsonObject + Assert.assertEquals(mappingsObjectAfter, mappingsObjectBefore) + } }