Skip to content

Commit

Permalink
add auto upgrade mapping logic (#699)
Browse files Browse the repository at this point in the history
* add auto upgrade mapping logic

Signed-off-by: zhichao-aws <[email protected]>

* put load mapping to initialize step

Signed-off-by: zhichao-aws <[email protected]>

* add schema_version field

Signed-off-by: zhichao-aws <[email protected]>

* add integ test

Signed-off-by: zhichao-aws <[email protected]>

* add integ test for lacking _meta field

Signed-off-by: zhichao-aws <[email protected]>

---------

Signed-off-by: zhichao-aws <[email protected]>
(cherry picked from commit 5670c35)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Aug 8, 2023
1 parent b67c5bd commit 2bed5b1
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
Expand All @@ -21,6 +22,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
Expand All @@ -46,23 +48,36 @@ import org.opensearch.notifications.model.NotificationConfigDoc
import org.opensearch.notifications.model.NotificationConfigDocInfo
import org.opensearch.notifications.settings.PluginSettings
import org.opensearch.notifications.util.SecureIndexClient
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntil
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntilTimeout
import org.opensearch.script.Script
import org.opensearch.search.SearchHit
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.lang.IllegalArgumentException
import java.util.concurrent.TimeUnit

/**
* Class for doing index operations to maintain configurations in cluster.
*/
@Suppress("TooManyFunctions")
internal object NotificationConfigIndex : ConfigOperations {
const val DEFAULT_SCHEMA_VERSION = 1
const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

private val log by logger(NotificationConfigIndex::class.java)
private const val INDEX_NAME = ".opensearch-notifications-config"
private const val MAPPING_FILE_NAME = "notifications-config-mapping.yml"
private const val SETTINGS_FILE_NAME = "notifications-config-settings.yml"

private val indexMappingAsMap = XContentHelper.convertToMap(
XContentType.YAML.xContent(),
NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!,
false
)
private val indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap)

private lateinit var client: Client
private lateinit var clusterService: ClusterService

Expand Down Expand Up @@ -92,15 +107,28 @@ internal object NotificationConfigIndex : ConfigOperations {
NotificationConfigIndex.clusterService = clusterService
}

private fun getSchemaVersionFromIndexMapping(indexMapping: Map<String, Any>?): Int {
var schemaVersion = DEFAULT_SCHEMA_VERSION
if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is Map<*, *>) {
val metaData = indexMapping[_META] as Map<*, *>
if (metaData.containsKey(SCHEMA_VERSION)) {
try {
schemaVersion = metaData[SCHEMA_VERSION] as Int
} catch (e: Exception) {
throw IllegalArgumentException("schema_version can only be Integer")
}
}
}
return schemaVersion
}

/**
* Create index using the mapping and settings defined in resource
*/
@Suppress("TooGenericExceptionCaught")
private suspend fun createIndex() {
if (!isIndexExists()) {
val classLoader = NotificationConfigIndex::class.java.classLoader
val indexMappingSource = classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!
val indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false)
val indexSettingsSource = classLoader.getResource(SETTINGS_FILE_NAME)?.readText()!!
val request = CreateIndexRequest(INDEX_NAME)
.mapping(indexMappingAsMap)
Expand All @@ -121,6 +149,22 @@ internal object NotificationConfigIndex : ConfigOperations {
}
}
}
} else {
val currentIndexMappingMetadata = clusterService.state().metadata.indices[INDEX_NAME]?.mapping()?.sourceAsMap()
val currentIndexMappingSchemaVersion = getSchemaVersionFromIndexMapping(currentIndexMappingMetadata)
if (currentIndexMappingSchemaVersion < indexMappingSchemaVersion) {
val putMappingRequest: PutMappingRequest = PutMappingRequest(INDEX_NAME).source(indexMappingAsMap)
client.threadPool().threadContext.stashContext().use {
val response: AcknowledgedResponse = client.suspendUntil {
admin().indices().putMapping(putMappingRequest, it)
}
if (response.isAcknowledged) {
log.info("$LOG_PREFIX:Index $INDEX_NAME update mapping Acknowledged")
} else {
throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME update mapping not Acknowledged")
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Schema file for the notifications-config index
# "dynamic" is set to "false" so that only specified fields are indexed instead of all fields.
dynamic: false
_meta:
schema_version: 1
properties:
metadata:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.core.xcontent.DeprecationHandler
import org.opensearch.core.xcontent.MediaType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.notifications.NotificationPlugin
import org.opensearch.notifications.index.NotificationConfigIndex
import org.opensearch.rest.RestRequest
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.io.IOException
Expand Down Expand Up @@ -364,6 +365,15 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() {
updateClusterSettings(ClusterSetting("transient", "*", null))
}

protected fun getCurrentMappingsSchemaVersion(): Int {
val indexName = ".opensearch-notifications-config"
val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings")
val response = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObject = response.get(indexName).asJsonObject.get("mappings").asJsonObject
return mappingsObject.get(NotificationConfigIndex._META)?.asJsonObject?.get(NotificationConfigIndex.SCHEMA_VERSION)?.asInt
?: NotificationConfigIndex.DEFAULT_SCHEMA_VERSION
}

protected class ClusterSetting(val type: String, val name: String, var value: Any?) {
init {
this.value = if (value == null) "null" else "\"" + value + "\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.opensearch.integtest.config

import org.junit.Assert
import org.opensearch.client.Request
import org.opensearch.client.ResponseException
import org.opensearch.client.WarningFailureException
import org.opensearch.commons.notifications.model.Chime
import org.opensearch.commons.notifications.model.ConfigType
import org.opensearch.commons.notifications.model.MethodType
Expand All @@ -16,6 +19,7 @@ import org.opensearch.commons.notifications.model.Webhook
import org.opensearch.core.rest.RestStatus
import org.opensearch.integtest.PluginRestTestCase
import org.opensearch.notifications.NotificationPlugin.Companion.PLUGIN_BASE_URI
import org.opensearch.notifications.index.NotificationConfigIndex
import org.opensearch.notifications.verifySingleConfigEquals
import org.opensearch.rest.RestRequest

Expand Down Expand Up @@ -217,4 +221,116 @@ class CreateNotificationConfigIT : PluginRestTestCase() {
RestStatus.BAD_REQUEST.status
)
}

fun `test automatically update mappings if encountering higher schema_version`() {
val indexName = ".opensearch-notifications-config"
val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName)
try {
adminClient().performRequest(deleteIndexRequest)
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}

val pseudoMappingString = """
{
"mappings": {
"_meta":{
"schema_version":0
},
"dynamic": "false",
"properties": {
"config": {
"properties": {
"chime": {
"properties": {
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
}
""".trimIndent()
try {
executeRequest(
RestRequest.Method.PUT.name,
indexName,
pseudoMappingString,
RestStatus.OK.status
)
} catch (e: Exception) {
/* ignore warnings */
assert(e is WarningFailureException)
}

Assert.assertEquals(0, getCurrentMappingsSchemaVersion())
createConfig()
Assert.assertEquals(1, getCurrentMappingsSchemaVersion())
}

fun `test _meta field not exists in current mappings`() {
val indexName = ".opensearch-notifications-config"
val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName)
try {
adminClient().performRequest(deleteIndexRequest)
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}

val pseudoMappingString = """
{
"mappings": {
"dynamic": "false",
"properties": {
"config": {
"properties": {
"chime": {
"properties": {
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
}
""".trimIndent()
try {
executeRequest(
RestRequest.Method.PUT.name,
indexName,
pseudoMappingString,
RestStatus.OK.status
)
} catch (e: Exception) {
/* ignore warnings */
assert(e is WarningFailureException)
}

val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings")
val responseBefore = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObjectBefore = responseBefore.get(indexName).asJsonObject.get("mappings").asJsonObject
Assert.assertNull("mappings should not have _meta field", mappingsObjectBefore.get(NotificationConfigIndex._META))
createConfig()
val responseAfter = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObjectAfter = responseAfter.get(indexName).asJsonObject.get("mappings").asJsonObject
Assert.assertEquals(mappingsObjectAfter, mappingsObjectBefore)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.notifications.index

import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import java.lang.reflect.Method
import kotlin.test.assertEquals
import kotlin.test.assertFails

class NotificationConfigIndexTests {

@Test
fun `test get schema version`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema_version" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(10, schemaVersion, "wrong schema version")
}

@Test
fun `test get schema version without _meta field`() {
val indexMapping = mapOf(
"meta" to mapOf("schema_version" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(1, schemaVersion, "wrong schema version")
}

@Test
fun `test get schema version without schema_version field`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(1, schemaVersion, "wrong schema version")
}

@Test
fun `test get non number schema_version throw exception`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema_version" to "10"),
"user" to "test"
)
assertFails {
getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
}
}

companion object {
private lateinit var getSchemaVersionFromIndexMapping: Method

@BeforeAll
@JvmStatic
fun initialize() {
/* use reflection to get private method */
getSchemaVersionFromIndexMapping = NotificationConfigIndex::class.java.getDeclaredMethod(
"getSchemaVersionFromIndexMapping", Map::class.java
)

getSchemaVersionFromIndexMapping.isAccessible = true
}
}
}

0 comments on commit 2bed5b1

Please sign in to comment.