Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add auto upgrade mapping logic #699

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
Expand All @@ -21,6 +22,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
Expand All @@ -46,23 +48,36 @@ import org.opensearch.notifications.model.NotificationConfigDoc
import org.opensearch.notifications.model.NotificationConfigDocInfo
import org.opensearch.notifications.settings.PluginSettings
import org.opensearch.notifications.util.SecureIndexClient
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntil
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntilTimeout
import org.opensearch.script.Script
import org.opensearch.search.SearchHit
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.lang.IllegalArgumentException
import java.util.concurrent.TimeUnit

/**
* Class for doing index operations to maintain configurations in cluster.
*/
@Suppress("TooManyFunctions")
internal object NotificationConfigIndex : ConfigOperations {
const val DEFAULT_SCHEMA_VERSION = 1
const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

private val log by logger(NotificationConfigIndex::class.java)
private const val INDEX_NAME = ".opensearch-notifications-config"
private const val MAPPING_FILE_NAME = "notifications-config-mapping.yml"
private const val SETTINGS_FILE_NAME = "notifications-config-settings.yml"

private val indexMappingAsMap = XContentHelper.convertToMap(
XContentType.YAML.xContent(),
NotificationConfigIndex::class.java.classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!,
false
)
private val indexMappingSchemaVersion = getSchemaVersionFromIndexMapping(indexMappingAsMap)

private lateinit var client: Client
private lateinit var clusterService: ClusterService

Expand Down Expand Up @@ -92,15 +107,28 @@ internal object NotificationConfigIndex : ConfigOperations {
NotificationConfigIndex.clusterService = clusterService
}

private fun getSchemaVersionFromIndexMapping(indexMapping: Map<String, Any>?): Int {
var schemaVersion = DEFAULT_SCHEMA_VERSION
if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is Map<*, *>) {
val metaData = indexMapping[_META] as Map<*, *>
if (metaData.containsKey(SCHEMA_VERSION)) {
try {
schemaVersion = metaData[SCHEMA_VERSION] as Int
} catch (e: Exception) {
throw IllegalArgumentException("schema_version can only be Integer")
}
}
}
return schemaVersion
}
Hailong-am marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create index using the mapping and settings defined in resource
*/
@Suppress("TooGenericExceptionCaught")
private suspend fun createIndex() {
if (!isIndexExists()) {
val classLoader = NotificationConfigIndex::class.java.classLoader
val indexMappingSource = classLoader.getResource(MAPPING_FILE_NAME)?.readText()!!
val indexMappingAsMap = XContentHelper.convertToMap(XContentType.YAML.xContent(), indexMappingSource, false)
val indexSettingsSource = classLoader.getResource(SETTINGS_FILE_NAME)?.readText()!!
val request = CreateIndexRequest(INDEX_NAME)
.mapping(indexMappingAsMap)
Expand All @@ -121,6 +149,22 @@ internal object NotificationConfigIndex : ConfigOperations {
}
}
}
} else {
val currentIndexMappingMetadata = clusterService.state().metadata.indices[INDEX_NAME]?.mapping()?.sourceAsMap()
val currentIndexMappingSchemaVersion = getSchemaVersionFromIndexMapping(currentIndexMappingMetadata)
if (currentIndexMappingSchemaVersion < indexMappingSchemaVersion) {
val putMappingRequest: PutMappingRequest = PutMappingRequest(INDEX_NAME).source(indexMappingAsMap)
client.threadPool().threadContext.stashContext().use {
val response: AcknowledgedResponse = client.suspendUntil {
admin().indices().putMapping(putMappingRequest, it)
}
if (response.isAcknowledged) {
log.info("$LOG_PREFIX:Index $INDEX_NAME update mapping Acknowledged")
} else {
throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME update mapping not Acknowledged")
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Schema file for the notifications-config index
# "dynamic" is set to "false" so that only specified fields are indexed instead of all fields.
dynamic: false
_meta:
schema_version: 1
properties:
metadata:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.core.xcontent.DeprecationHandler
import org.opensearch.core.xcontent.MediaType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.notifications.NotificationPlugin
import org.opensearch.notifications.index.NotificationConfigIndex
import org.opensearch.rest.RestRequest
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.io.IOException
Expand Down Expand Up @@ -370,6 +371,15 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() {
updateClusterSettings(ClusterSetting("transient", "*", null))
}

protected fun getCurrentMappingsSchemaVersion(): Int {
val indexName = ".opensearch-notifications-config"
val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings")
val response = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObject = response.get(indexName).asJsonObject.get("mappings").asJsonObject
return mappingsObject.get(NotificationConfigIndex._META)?.asJsonObject?.get(NotificationConfigIndex.SCHEMA_VERSION)?.asInt
?: NotificationConfigIndex.DEFAULT_SCHEMA_VERSION
}

protected class ClusterSetting(val type: String, val name: String, var value: Any?) {
init {
this.value = if (value == null) "null" else "\"" + value + "\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.opensearch.integtest.config

import org.junit.Assert
import org.opensearch.client.Request
import org.opensearch.client.ResponseException
import org.opensearch.client.WarningFailureException
import org.opensearch.commons.notifications.model.Chime
import org.opensearch.commons.notifications.model.ConfigType
import org.opensearch.commons.notifications.model.MethodType
Expand All @@ -16,6 +19,7 @@ import org.opensearch.commons.notifications.model.Webhook
import org.opensearch.core.rest.RestStatus
import org.opensearch.integtest.PluginRestTestCase
import org.opensearch.notifications.NotificationPlugin.Companion.PLUGIN_BASE_URI
import org.opensearch.notifications.index.NotificationConfigIndex
import org.opensearch.notifications.verifySingleConfigEquals
import org.opensearch.rest.RestRequest

Expand Down Expand Up @@ -217,4 +221,116 @@ class CreateNotificationConfigIT : PluginRestTestCase() {
RestStatus.BAD_REQUEST.status
)
}

fun `test automatically update mappings if encountering higher schema_version`() {
val indexName = ".opensearch-notifications-config"
val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName)
try {
adminClient().performRequest(deleteIndexRequest)
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}

val pseudoMappingString = """
{
"mappings": {
"_meta":{
"schema_version":0
},
"dynamic": "false",
"properties": {
"config": {
"properties": {
"chime": {
"properties": {
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
}
""".trimIndent()
zhichao-aws marked this conversation as resolved.
Show resolved Hide resolved
try {
executeRequest(
RestRequest.Method.PUT.name,
indexName,
pseudoMappingString,
RestStatus.OK.status
)
} catch (e: Exception) {
/* ignore warnings */
assert(e is WarningFailureException)
}

Assert.assertEquals(0, getCurrentMappingsSchemaVersion())
createConfig()
Assert.assertEquals(1, getCurrentMappingsSchemaVersion())
}

fun `test _meta field not exists in current mappings`() {
val indexName = ".opensearch-notifications-config"
val deleteIndexRequest = Request(RestRequest.Method.DELETE.name, indexName)
try {
adminClient().performRequest(deleteIndexRequest)
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}

val pseudoMappingString = """
{
"mappings": {
"dynamic": "false",
"properties": {
"config": {
"properties": {
"chime": {
"properties": {
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
}
""".trimIndent()
try {
executeRequest(
RestRequest.Method.PUT.name,
indexName,
pseudoMappingString,
RestStatus.OK.status
)
} catch (e: Exception) {
/* ignore warnings */
assert(e is WarningFailureException)
}

val getMappingRequest = Request(RestRequest.Method.GET.name, "$indexName/_mappings")
val responseBefore = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObjectBefore = responseBefore.get(indexName).asJsonObject.get("mappings").asJsonObject
Assert.assertNull("mappings should not have _meta field", mappingsObjectBefore.get(NotificationConfigIndex._META))
createConfig()
val responseAfter = executeRequest(getMappingRequest, RestStatus.OK.status, client())
val mappingsObjectAfter = responseAfter.get(indexName).asJsonObject.get("mappings").asJsonObject
Assert.assertEquals(mappingsObjectAfter, mappingsObjectBefore)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.notifications.index

import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import java.lang.reflect.Method
import kotlin.test.assertEquals
import kotlin.test.assertFails

class NotificationConfigIndexTests {

@Test
fun `test get schema version`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema_version" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(10, schemaVersion, "wrong schema version")
}

@Test
fun `test get schema version without _meta field`() {
val indexMapping = mapOf(
"meta" to mapOf("schema_version" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(1, schemaVersion, "wrong schema version")
}

@Test
fun `test get schema version without schema_version field`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema" to 10),
"user" to "test"
)
val schemaVersion = getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
assertEquals(1, schemaVersion, "wrong schema version")
}

@Test
fun `test get non number schema_version throw exception`() {
val indexMapping = mapOf(
"_meta" to mapOf("schema_version" to "10"),
"user" to "test"
)
assertFails {
getSchemaVersionFromIndexMapping.invoke(NotificationConfigIndex, indexMapping)
}
}

companion object {
private lateinit var getSchemaVersionFromIndexMapping: Method

@BeforeAll
@JvmStatic
fun initialize() {
/* use reflection to get private method */
getSchemaVersionFromIndexMapping = NotificationConfigIndex::class.java.getDeclaredMethod(
"getSchemaVersionFromIndexMapping", Map::class.java
)

getSchemaVersionFromIndexMapping.isAccessible = true
}
}
}