diff --git a/build-tools/coverage.gradle b/build-tools/coverage.gradle index fd9530964..8abac12a1 100644 --- a/build-tools/coverage.gradle +++ b/build-tools/coverage.gradle @@ -21,7 +21,7 @@ // testing tasks don't derive from Test so the jacoco plugin can't do this automatically. def jacocoDir = "${buildDir}/jacoco" -task dummyTest(type: Test) { +tasks.register("dummyTest", Test) { enabled = false workingDir = file("/") // Force absolute path to jacoco agent jar jacoco { @@ -31,7 +31,7 @@ task dummyTest(type: Test) { } } -task dummyIntegTest(type: Test) { +tasks.register("dummyIntegTest", Test) { enabled = false workingDir = file("/") // Force absolute path to jacoco agent jar jacoco { @@ -51,8 +51,8 @@ jacocoTestReport { sourceDirectories.from = "src/main/kotlin" classDirectories.from = sourceSets.main.output reports { - html.enabled = true // human readable - xml.enabled = true // for coverlay + html.required = true // human readable + xml.required = true // for coverlay } } diff --git a/build-tools/pkgbuild.gradle b/build-tools/pkgbuild.gradle index 6425d1e78..71cea1a7a 100644 --- a/build-tools/pkgbuild.gradle +++ b/build-tools/pkgbuild.gradle @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -apply plugin: 'nebula.ospackage' +apply plugin: 'com.netflix.nebula.ospackage' // This is afterEvaluate because the bundlePlugin ZIP task is updated afterEvaluate and changes the ZIP name to match the plugin name afterEvaluate { @@ -13,7 +13,7 @@ afterEvaluate { version = "${project.version}" - "-SNAPSHOT" into '/usr/share/opensearch/plugins' - from(zipTree(bundlePlugin.archivePath)) { + from(zipTree(bundlePlugin.archiveFile)) { into opensearchplugin.name } @@ -41,24 +41,24 @@ afterEvaluate { arch = 'NOARCH' dependsOn 'assemble' finalizedBy 'renameRpm' - task renameRpm(type: Copy) { + tasks.register("renameRpm", Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.rpm" - doLast { delete file("$buildDir/distributions/$archiveName") } + include archiveFileName + rename archiveFileName, "${packageName}-${version}.rpm" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } buildDeb { arch = 'all' dependsOn 'assemble' finalizedBy 'renameDeb' - task renameDeb(type: Copy) { + tasks.register("renameDeb", Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.deb" - doLast { delete file("$buildDir/distributions/$archiveName") } + include archiveFileName + rename archiveFileName, "${packageName}-${version}.deb" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } } diff --git a/build.gradle b/build.gradle index 1d6a2dc9e..1c8c2b83a 100644 --- a/build.gradle +++ b/build.gradle @@ -63,7 +63,7 @@ buildscript { notifications_core_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-core-' + notifications_no_snapshot + '.zip' - kotlin_version = System.getProperty("kotlin.version", "1.6.10") + kotlin_version = System.getProperty("kotlin.version", "1.8.21") security_plugin_version = System.getProperty("security.version", opensearch_build) } @@ -85,7 +85,7 @@ buildscript { } plugins { - id 'nebula.ospackage' version "8.3.0" + id "com.netflix.nebula.ospackage" version "11.3.0" id "com.dorongold.task-tree" version "2.1.1" } @@ -137,6 +137,8 @@ opensearchplugin { description 'OpenSearch Index Management Plugin' classname 'org.opensearch.indexmanagement.IndexManagementPlugin' extendedPlugins = ['opensearch-job-scheduler'] + licenseFile rootProject.file('LICENSE') + noticeFile rootProject.file('NOTICE') } tasks.named("integTest").configure { @@ -182,8 +184,6 @@ configurations.testImplementation { ext { projectSubstitutions = [:] - licenseFile = rootProject.file('LICENSE') - noticeFile = rootProject.file('NOTICE') } allprojects { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 249e5832f..c1962a79e 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ae04661ee..37aef8d3f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index a69d9cb6c..aeb74cbb4 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +80,10 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -143,12 +140,16 @@ fi if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,6 +194,10 @@ if "$cygwin" || "$msys" ; then done fi + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + # Collect all arguments for the java command; # * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of # shell script including quotes and variable substitutions, so put them in diff --git a/gradlew.bat b/gradlew.bat index 53a6b238d..6689b85be 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% diff --git a/spi/build.gradle b/spi/build.gradle index 4b398668a..2fa3f6bf7 100644 --- a/spi/build.gradle +++ b/spi/build.gradle @@ -23,15 +23,23 @@ ext { noticeFile = rootProject.file('NOTICE') } +plugins.withId('java') { + sourceCompatibility = targetCompatibility = JavaVersion.VERSION_11 +} + +plugins.withId('org.jetbrains.kotlin.jvm') { + compileKotlin.kotlinOptions.jvmTarget = compileTestKotlin.kotlinOptions.jvmTarget = JavaVersion.VERSION_11 +} + jacoco { toolVersion = '0.8.7' - reportsDir = file("$buildDir/JacocoReport") + reportsDirectory = file("$buildDir/JacocoReport") } jacocoTestReport { reports { - xml.enabled false - csv.enabled false + xml.required.set(false) + csv.required.set(false) html.destination file("${buildDir}/jacoco/") } } @@ -44,7 +52,7 @@ repositories { maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } } -configurations.all { +configurations.configureEach { if (it.state != Configuration.State.UNRESOLVED) return resolutionStrategy { force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -69,21 +77,22 @@ idea.module { excludeDirs -= file("$buildDir") } -task sourcesJar(type: Jar, dependsOn: classes) { - classifier = 'sources' +tasks.register("sourcesJar", Jar) { + dependsOn "classes" + archiveClassifier = 'sources' from sourceSets.main.allSource } test { doFirst { test.classpath -= project.files(project.tasks.named('shadowJar')) - test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME) - test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath + test.classpath -= project.configurations.named(ShadowBasePlugin.CONFIGURATION_NAME) + test.classpath += project.extensions.getByType(SourceSetContainer).named(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath } systemProperty 'tests.security.manager', 'false' } -task integTest(type: RestIntegTestTask) { +tasks.register("integTest", RestIntegTestTask) { description 'Run integ test with opensearch test framework' group 'verification' systemProperty 'tests.security.manager', 'false' diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 09f138c8d..7f15a0056 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -36,7 +36,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner -import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig @@ -75,7 +74,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.upda import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation -import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction @@ -432,12 +430,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin .registerExtensionChecker(extensionChecker) .registerIndexMetadataProvider(indexMetadataProvider) - val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices) - val templateService = ISMTemplateService(client, clusterService, xContentRegistry, indexManagementIndices) - val managedIndexCoordinator = ManagedIndexCoordinator( environment.settings(), - client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider + client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider ) val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt index 9c46489d8..b957f7714 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt @@ -176,7 +176,7 @@ class IndexStateManagementHistory( clusterStateRequest, object : ActionListener { override fun onResponse(clusterStateResponse: ClusterStateResponse) { - if (!clusterStateResponse.state.metadata.indices.isEmpty) { + if (!clusterStateResponse.state.metadata.indices.isEmpty()) { val indicesToDelete = getIndicesToDelete(clusterStateResponse) logger.info("Deleting old history indices viz $indicesToDelete") deleteAllOldHistoryIndices(indicesToDelete) @@ -199,7 +199,10 @@ class IndexStateManagementHistory( val creationTime = indexMetaData.creationDate if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis) { - val alias = indexMetaData.aliases.firstOrNull { IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias } + val alias = indexMetaData.aliases.firstNotNullOfOrNull { + alias -> + IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS == alias.value.alias + } if (alias != null && historyEnabled) { // If index has write alias and history is enable, don't delete the index. continue diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index bb286ea9e..eeeeb5252 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -58,10 +58,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JITTER import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.METADATA_SERVICE_ENABLED -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.METADATA_SERVICE_STATUS import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.TEMPLATE_MIGRATION_CONTROL import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE @@ -74,7 +71,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed import org.opensearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.contentParser import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse @@ -89,7 +85,6 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.threadpool.Scheduler import org.opensearch.threadpool.ThreadPool -import java.time.Instant /** * Listens for cluster changes to pick up new indices to manage. @@ -114,8 +109,6 @@ class ManagedIndexCoordinator( private val clusterService: ClusterService, private val threadPool: ThreadPool, indexManagementIndices: IndexManagementIndices, - private val metadataService: MetadataService, - private val templateService: ISMTemplateService, private val indexMetadataProvider: IndexMetadataProvider ) : ClusterStateListener, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), @@ -130,12 +123,9 @@ class ManagedIndexCoordinator( @Volatile private var lastFullSweepTimeNano = System.nanoTime() @Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings) - @Volatile private var metadataServiceEnabled = METADATA_SERVICE_ENABLED.get(settings) @Volatile private var sweepPeriod = SWEEP_PERIOD.get(settings) @Volatile private var retryPolicy = BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings)) - @Volatile private var templateMigrationEnabled: Boolean = true - @Volatile private var templateMigrationEnabledSetting = TEMPLATE_MIGRATION_CONTROL.get(settings) @Volatile private var jobInterval = JOB_INTERVAL.get(settings) @Volatile private var jobJitter = JITTER.get(settings) @@ -159,18 +149,6 @@ class ManagedIndexCoordinator( indexStateManagementEnabled = it if (!indexStateManagementEnabled) disable() else enable() } - clusterService.clusterSettings.addSettingsUpdateConsumer(METADATA_SERVICE_STATUS) { - metadataServiceEnabled = it == 0 - if (!metadataServiceEnabled) { - logger.info("Canceling metadata moving job because of cluster setting update.") - scheduledMoveMetadata?.cancel() - } else initMoveMetadata() - } - clusterService.clusterSettings.addSettingsUpdateConsumer(TEMPLATE_MIGRATION_CONTROL) { - templateMigrationEnabled = it >= 0L - if (!templateMigrationEnabled) scheduledTemplateMigration?.cancel() - else initTemplateMigration(it) - } clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) } @@ -186,10 +164,6 @@ class ManagedIndexCoordinator( // Init background sweep when promoted to being cluster manager initBackgroundSweep() - - initMoveMetadata() - - initTemplateMigration(templateMigrationEnabledSetting) } fun offClusterManager() { @@ -227,8 +201,6 @@ class ManagedIndexCoordinator( override fun afterStart() { initBackgroundSweep() - - initMoveMetadata() } override fun beforeStop() { @@ -241,8 +213,6 @@ class ManagedIndexCoordinator( initBackgroundSweep() indexStateManagementEnabled = true - initMoveMetadata() - // Calling initBackgroundSweep() beforehand runs a sweep ensuring that policies removed from indices // and indices being deleted are accounted for prior to re-enabling jobs launch { @@ -510,76 +480,6 @@ class ManagedIndexCoordinator( scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, executorName()) } - fun initMoveMetadata() { - if (!metadataServiceEnabled) return - if (!isIndexStateManagementEnabled()) return - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return - scheduledMoveMetadata?.cancel() - - if (metadataService.finishFlag) { - logger.info("Re-enable Metadata Service.") - metadataService.reenableMetadataService() - } - - val scheduledJob = Runnable { - launch { - try { - if (metadataService.finishFlag) { - logger.info("Cancel background move metadata process.") - scheduledMoveMetadata?.cancel() - } - - logger.info("Performing move cluster state metadata.") - metadataService.moveMetadata() - } catch (e: Exception) { - logger.error("Failed to move cluster state metadata", e) - } - } - } - - scheduledMoveMetadata = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), executorName()) - } - - fun initTemplateMigration(enableSetting: Long) { - if (!templateMigrationEnabled) return - if (!isIndexStateManagementEnabled()) return - if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return - scheduledTemplateMigration?.cancel() - - // if service has finished, re-enable it - if (templateService.finishFlag) { - logger.info("Re-enable template migration service.") - templateService.reenableTemplateMigration() - } - - val scheduledJob = Runnable { - launch { - try { - if (templateService.finishFlag) { - logger.info("ISM template migration process finished, cancel scheduled job.") - scheduledTemplateMigration?.cancel() - return@launch - } - - logger.info("Performing ISM template migration.") - if (enableSetting == 0L) { - if (onClusterManagerTimeStamp != 0L) - templateService.doMigration(Instant.ofEpochMilli(onClusterManagerTimeStamp)) - else { - logger.error("No valid onClusterManager time cached, cancel ISM template migration job.") - scheduledTemplateMigration?.cancel() - } - } else - templateService.doMigration(Instant.ofEpochMilli(enableSetting)) - } catch (e: Exception) { - logger.error("Failed to migrate ISM template", e) - } - } - } - - scheduledTemplateMigration = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), executorName()) - } - private fun getFullSweepElapsedTime(): TimeValue = TimeValue.timeValueNanos(System.nanoTime() - lastFullSweepTimeNano) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index a6fdfdba8..c8d65321a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -262,12 +262,12 @@ object ManagedIndexRunner : } // Check the cluster state for the index metadata - var clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) + val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but // the cluster state index uuid differs from the one in the managed index config then the config is referring - // to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists + // to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } @@ -846,7 +846,7 @@ object ManagedIndexRunner : val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) } - indexMetaData = response.state.metadata.indices.firstOrNull()?.value + indexMetaData = response.state.metadata.indices[index] } catch (e: Exception) { logger.error("Failed to get IndexMetaData from cluster manager cluster state for index=$index", e) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt deleted file mode 100644 index c42682f71..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement - -import org.apache.logging.log4j.LogManager -import org.opensearch.ExceptionsHelper -import org.opensearch.action.ActionListener -import org.opensearch.action.DocWriteRequest -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse -import org.opensearch.action.bulk.BackoffPolicy -import org.opensearch.action.bulk.BulkItemResponse -import org.opensearch.action.bulk.BulkRequest -import org.opensearch.action.bulk.BulkResponse -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.client.Client -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.settings.Settings -import org.opensearch.common.unit.TimeValue -import org.opensearch.index.Index -import org.opensearch.indexmanagement.IndexManagementIndices -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.revertManagedIndexMetadataID -import org.opensearch.indexmanagement.opensearchapi.retry -import org.opensearch.indexmanagement.opensearchapi.suspendUntil -import org.opensearch.indexmanagement.util.IndexManagementException -import org.opensearch.indexmanagement.util.OpenForTesting -import org.opensearch.rest.RestStatus -import java.lang.Exception - -/** - * When all nodes have same version IM plugin (CDI/DDI finished) - * MetadataService starts to move metadata from cluster state to config index - */ -@OpenForTesting -@Suppress("MagicNumber", "ReturnCount", "LongMethod", "ComplexMethod") -class MetadataService( - private val client: Client, - private val clusterService: ClusterService, - private val skipExecution: SkipExecution, - private val imIndices: IndexManagementIndices -) { - private val logger = LogManager.getLogger(javaClass) - - @Volatile private var runningLock = false // in case 2 moveMetadata() process running - - private val successfullyIndexedIndices = mutableSetOf() - private var failedToIndexIndices = mutableMapOf() - private var failedToCleanIndices = mutableSetOf() - - private var counter = 0 - final var runTimeCounter = 1 - private set - private val maxRunTime = 10 - - // used in coordinator sweep to cancel scheduled process - @Volatile final var finishFlag = false - private set - fun reenableMetadataService() { finishFlag = false } - - @Volatile private var retryPolicy = - BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(50), 3) - - suspend fun moveMetadata() { - if (runningLock) { - logger.info("There is a move metadata process running...") - return - } else if (finishFlag) { - logger.info("Move metadata has finished.") - return - } - try { - runningLock = true - - if (skipExecution.flag) { - logger.info("Cluster still has nodes running old version of ISM plugin, skip ping execution on new nodes until all nodes upgraded") - runningLock = false - return - } - - if (!imIndices.indexManagementIndexExists()) { - logger.info("ISM config index not exist, so we cancel the metadata migration job.") - finishFlag = true; runningLock = false; runTimeCounter = 0 - return - } - - if (runTimeCounter > maxRunTime) { - updateStatusSetting(-1) - finishFlag = true; runningLock = false; runTimeCounter = 0 - return - } - logger.info("Doing metadata migration $runTimeCounter time.") - - val indicesMetadata = clusterService.state().metadata.indices - var clusterStateManagedIndexMetadata = indicesMetadata.map { - it.key to it.value.getManagedIndexMetadata() - }.filter { it.second != null }.distinct().toMap() - // filter out previous failedToClean indices which already been indexed - clusterStateManagedIndexMetadata = - clusterStateManagedIndexMetadata.filter { it.key !in failedToCleanIndices.map { index -> index.name } } - - // filter out cluster state metadata with outdated index uuid - val corruptManagedIndices = mutableListOf() - val indexUuidMap = mutableMapOf() - clusterStateManagedIndexMetadata.forEach { (indexName, metadata) -> - val indexMetadata = indicesMetadata[indexName] - val currentIndexUuid = indexMetadata.indexUUID - if (currentIndexUuid != metadata?.indexUuid) { - corruptManagedIndices.add(indexMetadata.index) - } else { - indexUuidMap[currentIndexUuid] = indexName - } - } - logger.info("Corrupt managed indices with outdated index uuid in metadata: $corruptManagedIndices") - clusterStateManagedIndexMetadata = clusterStateManagedIndexMetadata.filter { (indexName, _) -> - indexName !in corruptManagedIndices.map { it.name } - } - - if (clusterStateManagedIndexMetadata.isEmpty()) { - if (counter++ > 2 && corruptManagedIndices.isEmpty()) { - logger.info("Move Metadata succeed, set finish flag to true. Indices failed to get indexed: $failedToIndexIndices") - updateStatusSetting(1) - finishFlag = true; runningLock = false; runTimeCounter = 0 - return - } - if (failedToCleanIndices.isNotEmpty()) { - logger.info("Failed to clean indices: $failedToCleanIndices. Only clean cluster state metadata in this run.") - cleanMetadatas(failedToCleanIndices.toList()) - finishFlag = false; runningLock = false - return - } - } else { - counter = 0; finishFlag = false // index metadata for indices which metadata hasn't been indexed - val bulkIndexReq = - clusterStateManagedIndexMetadata.mapNotNull { it.value }.map { - managedIndexMetadataIndexRequest( - it, - waitRefresh = false, // should be set at bulk request level - create = true // restrict this as create operation - ) - } - // remove the part which gonna be indexed from last time failedToIndex - failedToIndexIndices = failedToIndexIndices.filterKeys { it !in indexUuidMap.keys }.toMutableMap() - successfullyIndexedIndices.clear() - indexMetadatas(bulkIndexReq) - - logger.info("success indexed: ${successfullyIndexedIndices.map { indexUuidMap[it] }}") - logger.info( - "failed indexed: ${failedToIndexIndices.map { indexUuidMap[it.key] }};" + - "failed reason: ${failedToIndexIndices.values.distinct()}" - ) - } - - // clean metadata for indices which metadata already been indexed - val indicesToCleanMetadata = - indexUuidMap.filter { it.key in successfullyIndexedIndices }.map { Index(it.value, it.key) } - .toList() + failedToCleanIndices + corruptManagedIndices - - cleanMetadatas(indicesToCleanMetadata) - if (failedToCleanIndices.isNotEmpty()) { - logger.info("Failed to clean cluster metadata for: ${failedToCleanIndices.map { it.name }}") - } - - runTimeCounter++ - } finally { - runningLock = false - } - } - - private suspend fun updateStatusSetting(status: Int) { - val newSetting = Settings.builder().put(ManagedIndexSettings.METADATA_SERVICE_STATUS.key, status) - val request = ClusterUpdateSettingsRequest().persistentSettings(newSetting) - retryPolicy.retry(logger, listOf(RestStatus.INTERNAL_SERVER_ERROR)) { - client.admin().cluster().updateSettings(request, updateSettingListener(status)) - } - } - - private fun updateSettingListener(status: Int): ActionListener { - return object : ActionListener { - override fun onFailure(e: Exception) { - logger.error("Failed to update template migration setting to $status", e) - throw IndexManagementException.wrap(Exception("Failed to update template migration setting to $status")) - } - - override fun onResponse(response: ClusterUpdateSettingsResponse) { - if (!response.isAcknowledged) { - logger.error("Update metadata migration setting to $status is not acknowledged") - throw IndexManagementException.wrap( - Exception("Update metadata migration setting to $status is not acknowledged") - ) - } else { - logger.info("Successfully metadata template migration setting to $status") - } - } - } - } - - private suspend fun indexMetadatas(requests: List>) { - if (requests.isEmpty()) return - var requestsToRetry = requests - - // when we try to index sth to config index - // we need to make sure the schema is up to date - if (!imIndices.attemptUpdateConfigIndexMapping()) { - logger.error("Failed to update config index mapping.") - return - } - - retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val bulkRequest = BulkRequest().add(requestsToRetry) - val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } - val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } - - val retryIndexUuids = mutableListOf() - bulkResponse.items.forEach { - val indexUuid = revertManagedIndexMetadataID(it.id) - if (it.isFailed) { - if (it.status() == RestStatus.TOO_MANY_REQUESTS) { - retryIndexUuids.add(it.itemId) - } else { - logger.error("failed reason: ${it.failure}, ${it.failureMessage}") - failedToIndexIndices[indexUuid] = it.failure - } - } else { - successfullyIndexedIndices.add(indexUuid) - failedToIndexIndices.remove(indexUuid) - } - } - requestsToRetry = retryIndexUuids.map { bulkRequest.requests()[it] } - - if (requestsToRetry.isNotEmpty()) { - val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause - throw ExceptionsHelper.convertToOpenSearchException(retryCause) - } - } - } - - private suspend fun cleanMetadatas(indices: List) { - if (indices.isEmpty()) return - - val request = UpdateManagedIndexMetaDataRequest(indicesToRemoveManagedIndexMetaDataFrom = indices) - try { - retryPolicy.retry(logger) { - val response: AcknowledgedResponse = - client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } - if (response.isAcknowledged) { - failedToCleanIndices.removeAll(indices) - } else { - logger.error("Failed to clean cluster state metadata for indices: [$indices].") - failedToCleanIndices.addAll(indices) - } - } - } catch (e: Exception) { - logger.error("Failed to clean cluster state metadata for indices: [$indices].", e) - failedToCleanIndices.addAll(indices) - } - } -} - -typealias MetadataDocID = String -typealias IndexUuid = String -typealias IndexName = String diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt deleted file mode 100644 index 542ac5647..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServices.kt +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.migration - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.withContext -import org.apache.logging.log4j.LogManager -import org.opensearch.ExceptionsHelper -import org.opensearch.action.ActionListener -import org.opensearch.action.ActionRequestValidationException -import org.opensearch.action.DocWriteResponse -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse -import org.opensearch.action.admin.indices.template.post.SimulateIndexTemplateResponse -import org.opensearch.action.admin.indices.template.post.SimulateTemplateAction -import org.opensearch.action.bulk.BackoffPolicy -import org.opensearch.action.bulk.BulkItemResponse -import org.opensearch.action.get.MultiGetRequest -import org.opensearch.action.get.MultiGetResponse -import org.opensearch.action.update.UpdateRequest -import org.opensearch.action.update.UpdateResponse -import org.opensearch.client.Client -import org.opensearch.cluster.metadata.Template -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.io.stream.BytesStreamOutput -import org.opensearch.common.io.stream.StreamInput -import org.opensearch.common.settings.Settings -import org.opensearch.common.unit.TimeValue -import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentType -import org.opensearch.indexmanagement.IndexManagementIndices -import org.opensearch.indexmanagement.IndexManagementPlugin -import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.util.updateISMTemplateRequest -import org.opensearch.indexmanagement.opensearchapi.parseWithType -import org.opensearch.indexmanagement.opensearchapi.retry -import org.opensearch.indexmanagement.opensearchapi.suspendUntil -import org.opensearch.indexmanagement.util.OpenForTesting -import org.opensearch.rest.RestStatus -import java.time.Instant - -@OpenForTesting -@Suppress("MagicNumber", "ReturnCount", "ThrowsCount", "TooManyFunctions", "ComplexMethod", "NestedBlockDepth") -class ISMTemplateService( - private val client: Client, - private val clusterService: ClusterService, - private val xContentRegistry: NamedXContentRegistry, - private val imIndices: IndexManagementIndices -) { - - private val logger = LogManager.getLogger(javaClass) - - @Volatile final var finishFlag = false - private set - fun reenableTemplateMigration() { finishFlag = false } - - @Volatile var runTimeCounter = 0 - - private var ismTemplateMap = mutableMapOf>() - private val v1TemplatesWithPolicyID = mutableMapOf() - - private val negOrderToPositive = mutableMapOf() - private val v1orderToTemplatesName = mutableMapOf>() - private val v1orderToBucketIncrement = mutableMapOf() - - private val policiesToUpdate = mutableMapOf() - private val policiesFailedToUpdate = mutableMapOf() - private lateinit var lastUpdatedTime: Instant - - @Volatile private var retryPolicy = - BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(50), 3) - - suspend fun doMigration(timeStamp: Instant) { - if (runTimeCounter >= 10) { - stopMigration(-2) - return - } - logger.info("Doing ISM template migration ${++runTimeCounter} time.") - cleanCache() - - lastUpdatedTime = timeStamp.minusSeconds(3600) - logger.info("Use $lastUpdatedTime as migrating ISM template last_updated_time") - - getIndexTemplates() - logger.info("ISM templates: $ismTemplateMap") - - getISMPolicies() - logger.info("Policies to update: ${policiesToUpdate.keys}") - - updateISMPolicies() - - if (policiesToUpdate.isEmpty()) { - stopMigration(-1) - } - } - - private fun stopMigration(successFlag: Long) { - finishFlag = true - val newSetting = Settings.builder().put(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL.key, successFlag) - val request = ClusterUpdateSettingsRequest().persistentSettings(newSetting) - client.admin().cluster().updateSettings(request, updateSettingListener()) - logger.info("Failure experienced when migrating ISM Template and update ISM policies: $policiesFailedToUpdate") - // TODO what if update setting failed, cannot reset to -1/-2 - runTimeCounter = 0 - } - - private fun updateSettingListener(): ActionListener { - return object : ActionListener { - override fun onFailure(e: Exception) { - logger.error("Failed to update template migration setting", e) - } - - override fun onResponse(response: ClusterUpdateSettingsResponse) { - if (!response.isAcknowledged) { - logger.error("Update template migration setting is not acknowledged") - } else { - logger.info("Successfully update template migration setting") - } - } - } - } - - private suspend fun getIndexTemplates() { - processNegativeOrder() - - bucketizeV1TemplatesByOrder() - populateBucketPriority() - populateV1Template() - - clusterService.state().metadata.templatesV2().forEach { - val template = it.value - val indexPatterns = template.indexPatterns() - val priority = template.priorityOrZero().toInt() - val policyIDSetting = simulateTemplate(it.key) - if (policyIDSetting != null) { - populateV2ISMTemplateMap(policyIDSetting, indexPatterns, priority) - } - } - } - - // old v1 template can have negative priority - // map the negative priority to non-negative value - private fun processNegativeOrder() { - val negOrderSet = mutableSetOf() - clusterService.state().metadata.templates.forEach { - val policyIDSetting = ManagedIndexSettings.POLICY_ID.get(it.value.settings()) - if (policyIDSetting != "") { - val priority = it.value.order - if (priority < 0) { - negOrderSet.add(priority) - } - // cache pattern and policyID for v1 template - v1TemplatesWithPolicyID[it.key] = V1TemplateCache(it.value.patterns(), 0, policyIDSetting) - } - } - val sorted = negOrderSet.sorted() - var p = 0 - for (i in sorted) { - negOrderToPositive[i] = p++ - } - } - - private fun normalizePriority(order: Int): Int { - if (order < 0) return negOrderToPositive[order] ?: 0 - return order + (negOrderToPositive.size) - } - - private fun bucketizeV1TemplatesByOrder() { - clusterService.state().metadata.templates.forEach { - val v1TemplateCache = v1TemplatesWithPolicyID[it.key] - if (v1TemplateCache != null) { - val priority = normalizePriority(it.value.order) - // cache the non-negative priority - v1TemplatesWithPolicyID[it.key] = v1TemplateCache.copy(order = priority) - - val bucket = v1orderToTemplatesName[priority] - if (bucket == null) { - v1orderToTemplatesName[priority] = mutableListOf(it.key) - } else { - // add later one to start of the list - bucket.add(0, it.key) - } - } - } - } - - private fun populateBucketPriority() { - v1orderToTemplatesName.forEach { (order, templateNames) -> - var increase = 0 - templateNames.forEach { - val v1TemplateCache = v1TemplatesWithPolicyID[it] - if (v1TemplateCache != null) { - val cachePriority = v1TemplateCache.order - v1TemplatesWithPolicyID[it] = v1TemplateCache - .copy(order = cachePriority + increase) - } - increase++ - } - v1orderToBucketIncrement[order] = templateNames.size - 1 - } - } - - private fun populateV1Template() { - val allOrders = v1orderToTemplatesName.keys.toList().sorted() - allOrders.forEachIndexed { ind, order -> - val smallerOrders = allOrders.subList(0, ind) - val increments = smallerOrders.mapNotNull { v1orderToBucketIncrement[it] }.sum() - - val templates = v1orderToTemplatesName[order] - templates?.forEach { - val v1TemplateCache = v1TemplatesWithPolicyID[it] - if (v1TemplateCache != null) { - val policyID = v1TemplateCache.policyID - val indexPatterns = v1TemplateCache.patterns - val priority = v1TemplateCache.order + increments - saveISMTemplateToMap(policyID, ISMTemplate(indexPatterns, priority, lastUpdatedTime)) - } - } - } - } - - private fun saveISMTemplateToMap(policyID: String, ismTemplate: ISMTemplate) { - val policyToISMTemplate = ismTemplateMap[policyID] - if (policyToISMTemplate != null) { - policyToISMTemplate.add(ismTemplate) - } else { - ismTemplateMap[policyID] = mutableListOf(ismTemplate) - } - } - - private suspend fun simulateTemplate(templateName: String): String? { - val request = SimulateTemplateAction.Request(templateName) - val response: SimulateIndexTemplateResponse = - client.suspendUntil { execute(SimulateTemplateAction.INSTANCE, request, it) } - - var policyIDSetting: String? = null - withContext(Dispatchers.IO) { - val out = BytesStreamOutput().also { response.writeTo(it) } - val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val resolvedTemplate = sin.readOptionalWriteable(::Template) - if (resolvedTemplate != null) { - policyIDSetting = ManagedIndexSettings.POLICY_ID.get(resolvedTemplate.settings()) - } - } - return policyIDSetting - } - - private fun populateV2ISMTemplateMap(policyID: String, indexPatterns: List, priority: Int) { - var v1Increment = 0 - val v1MaxOrder = v1orderToBucketIncrement.keys.maxOrNull() - if (v1MaxOrder != null) { - v1Increment = v1MaxOrder + v1orderToBucketIncrement.values.sum() - } - - saveISMTemplateToMap(policyID, ISMTemplate(indexPatterns, normalizePriority(priority) + v1Increment, lastUpdatedTime)) - } - - private suspend fun getISMPolicies() { - if (ismTemplateMap.isEmpty()) return - - val mReq = MultiGetRequest() - ismTemplateMap.keys.forEach { mReq.add(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, it) } - try { - val mRes: MultiGetResponse = client.suspendUntil { multiGet(mReq, it) } - policiesToUpdate.clear() - mRes.forEach { - if (it.response != null && !it.response.isSourceEmpty && !it.isFailed) { - val response = it.response - var policy: Policy? = null - try { - policy = XContentHelper.createParser( - xContentRegistry, - LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, - XContentType.JSON - ).use { xcp -> - xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Policy.Companion::parse) - } - } catch (e: Exception) { - logger.error("Failed to parse policy [${response.id}] when migrating templates", e) - } - - if (policy?.ismTemplate == null) { - policiesToUpdate[it.id] = Pair(response.seqNo, response.primaryTerm) - } - } - } - } catch (e: ActionRequestValidationException) { - logger.warn("ISM config index not exists when migrating templates.") - } - } - - private suspend fun updateISMPolicies() { - if (policiesToUpdate.isEmpty()) return - - if (!imIndices.attemptUpdateConfigIndexMapping()) { - logger.error("Failed to update config index mapping.") - return - } - - var requests = mutableListOf() - policiesToUpdate.forEach { policyID, (seqNo, priTerm) -> - val ismTemplates = ismTemplateMap[policyID] - if (ismTemplates != null) - requests.add(updateISMTemplateRequest(policyID, ismTemplates, seqNo, priTerm)) - } - - retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val failedRequests = mutableListOf() - var retryCause: Exception? = null - requests.forEach { req -> - var res: UpdateResponse? = null - try { - res = client.suspendUntil { update(req, it) } - logger.info("update policy for ${req.id()}") - if (res?.result == DocWriteResponse.Result.UPDATED) { - policiesToUpdate.remove(req.id()) - } - } catch (e: Exception) { - logger.info("failed to update policy for ${req.id()}") - if (res?.status() == RestStatus.TOO_MANY_REQUESTS) { - failedRequests.add(req) - retryCause = e - } else { - logger.error("Failed to update policy ${req.id()} with ISM template", e) - } - } - } - - if (failedRequests.isNotEmpty()) { - requests = failedRequests - throw ExceptionsHelper.convertToOpenSearchException(retryCause) - } - } - } - - private fun cleanCache() { - ismTemplateMap.clear() - v1TemplatesWithPolicyID.clear() - v1orderToTemplatesName.clear() - v1orderToBucketIncrement.clear() - negOrderToPositive.clear() - policiesToUpdate.clear() - } -} - -data class V1TemplateCache( - val patterns: List, - val order: Int, - val policyID: String -) - -typealias policyID = String -typealias templateName = String -typealias seqNoPrimaryTerm = Pair diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 157676f00..cb7e4c6ad 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -199,8 +199,6 @@ fun XContentBuilder.buildMetadata(name: String, metadata: ToXContentFragment, pa // Get the oldest rollover time or null if index was never rolled over fun IndexMetadata.getOldestRolloverTime(): Instant? { - return this.rolloverInfos.values() - .map { it.value.time } - .minOrNull() // oldest should be min as its epoch time + return this.rolloverInfos.values.minOfOrNull { it.time } // oldest should be min as its epoch time ?.let { Instant.ofEpochMilli(it) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index a08e5beaf..9754f5d8a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -84,7 +84,8 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val (statsStore, statsDocs, shardStats) = getIndexStats(indexName, client) ?: return this val indexSize = statsStore.sizeInBytes // Get stats of current and target shards - val numOriginalShards = context.clusterService.state().metadata.indices[indexName].numberOfShards + val numOriginalShards = context.clusterService.state().metadata.indices[indexName]?.numberOfShards + ?: error("numOriginalShards should not be null") val numTargetShards = getNumTargetShards(numOriginalShards, indexSize) if (shouldFailTooManyDocuments(statsDocs, numTargetShards)) return this @@ -215,7 +216,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, private fun shouldFailUnsafe(clusterService: ClusterService, indexName: String): Boolean { // If forceUnsafe is set and is true, then we don't even need to check the number of replicas if (action.forceUnsafe == true) return false - val numReplicas = clusterService.state().metadata.indices[indexName].numberOfReplicas + val numReplicas = clusterService.state().metadata.indices[indexName]?.numberOfReplicas val shouldFailForceUnsafeCheck = numReplicas == 0 if (shouldFailForceUnsafeCheck) { logger.info(UNSAFE_FAILURE_MESSAGE) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt index e4f27686b..943f59279 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForMoveShardsStep.kt @@ -33,7 +33,8 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, val numShardsInSync = getNumShardsInSync(shardStats, context.clusterService.state(), indexName) val nodeToMoveOnto = localShrinkActionProperties.nodeName val numShardsOnNode = getNumShardsWithCopyOnNode(shardStats, context.clusterService.state(), nodeToMoveOnto) - val numPrimaryShards = context.clusterService.state().metadata.indices[indexName].numberOfShards + val numPrimaryShards = context.clusterService.state().metadata.indices[indexName]?.numberOfShards + ?: error("numberOfShards should not be null") // If a copy of each shard is on the node, and all shards are in sync, move on if (numShardsOnNode >= numPrimaryShards && numShardsInSync >= numPrimaryShards) { @@ -49,8 +50,8 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, // Returns the number of shard IDs where all primary and replicas are in sync private fun getNumShardsInSync(shardStats: Array, state: ClusterState, indexName: String): Int { - val numReplicas = state.metadata.indices[indexName].numberOfReplicas - val inSyncAllocations = state.metadata.indices[indexName].inSyncAllocationIds + val numReplicas = state.metadata.indices[indexName]?.numberOfReplicas ?: error("numberOfReplicas should not be null") + val inSyncAllocations = state.metadata.indices[indexName]?.inSyncAllocationIds var numShardsInSync = 0 for (shard: ShardStats in shardStats) { val routingInfo = shard.shardRouting @@ -58,7 +59,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, if (routingInfo.primary()) { // All shards must be in sync as it isn't known which shard (replica or primary) will be // moved to the target node and used in the shrink. - if (inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) { + if (inSyncAllocations?.get(routingInfo.id)?.size == (numReplicas + 1)) { numShardsInSync++ } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt index 6eb783a85..906360039 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt @@ -54,7 +54,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru private suspend fun shrinkNotDone(targetIndex: String, targetNumShards: Int, client: Client, clusterService: ClusterService): Boolean { val numPrimaryShardsStarted = getNumPrimaryShardsStarted(client, targetIndex) - val numPrimaryShards = clusterService.state().metadata.indices[targetIndex].numberOfShards + val numPrimaryShards = clusterService.state().metadata.indices[targetIndex]?.numberOfShards return numPrimaryShards != targetNumShards || numPrimaryShardsStarted != targetNumShards } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index a007e2868..3ea39159c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -269,7 +269,7 @@ class TransportExplainAction @Inject constructor( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - val clusterStateIndexMetadatas = response.state.metadata.indices.associate { it.key to it.value } + val clusterStateIndexMetadatas = response.state.metadata.indices getMetadataMap(clusterStateIndexMetadatas, threadContext) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt index f5f5a9ae7..e3fb93dc0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt @@ -76,7 +76,7 @@ class TransportUpdateManagedIndexMetaDataAction @Inject constructor( .toTypedArray() } - override fun masterOperation( + override fun clusterManagerOperation( request: UpdateManagedIndexMetaDataRequest, state: ClusterState, listener: ActionListener diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index 2f44f4389..8ac3b365e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -343,9 +343,9 @@ class RollupMapperService( return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause) } - val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName] + val indexMapping: MappingMetadata? = res.mappings[targetIndexResolvedName] - return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) { + return if (((indexMapping?.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) { RollupJobValidationResult.Valid } else { RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt index 38664e41c..2f0caf351 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt @@ -57,7 +57,7 @@ class TransportUpdateRollupMappingAction @Inject constructor( } @Suppress("ReturnCount", "LongMethod") - override fun masterOperation( + override fun clusterManagerOperation( request: UpdateRollupMappingRequest, state: ClusterState, listener: ActionListener diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index fe2c38801..f800b54cd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -57,9 +57,9 @@ object RollupFieldValueExpressionResolver { open class IndexAliasUtils(val clusterService: ClusterService) { open fun hasAlias(index: String): Boolean { - val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases + val aliases = this.clusterService.state().metadata().indices[index]?.aliases if (aliases != null) { - return aliases.size() > 0 + return aliases.isNotEmpty() } return false } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index 5c5af20f3..f0a2c3a23 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -80,10 +80,10 @@ class IndexUtils { return DEFAULT_SCHEMA_VERSION } - fun shouldUpdateIndex(index: IndexMetadata, newVersion: Long): Boolean { + fun shouldUpdateIndex(index: IndexMetadata?, newVersion: Long): Boolean { var oldVersion = DEFAULT_SCHEMA_VERSION - val indexMapping = index.mapping()?.sourceAsMap() + val indexMapping = index?.mapping()?.sourceAsMap() if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is HashMap<*, *>) { val metaData = indexMapping[_META] as HashMap<*, *> if (metaData.containsKey(SCHEMA_VERSION)) { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt deleted file mode 100644 index 8d37f4805..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement - -import org.apache.hc.core5.http.ContentType -import org.apache.hc.core5.http.io.entity.StringEntity -import org.junit.After -import org.junit.Before -import org.opensearch.OpenSearchParseException -import org.opensearch.action.ActionRequest -import org.opensearch.action.ActionResponse -import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest -import org.opensearch.action.search.SearchResponse -import org.opensearch.client.Request -import org.opensearch.client.Response -import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand -import org.opensearch.common.Strings -import org.opensearch.common.settings.Settings -import org.opensearch.core.xcontent.DeprecationHandler -import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.core.xcontent.XContentParser -import org.opensearch.common.xcontent.XContentParserUtils -import org.opensearch.common.xcontent.XContentType -import org.opensearch.common.xcontent.json.JsonXContent -import org.opensearch.indexmanagement.IndexManagementPlugin -import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.wipeAllIndices -import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.util.TOTAL_MANAGED_INDICES -import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.opensearchapi.parseWithType -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData -import org.opensearch.indexmanagement.waitFor -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule -import org.opensearch.plugins.ActionPlugin -import org.opensearch.plugins.Plugin -import org.opensearch.rest.RestRequest -import org.opensearch.rest.RestStatus -import org.opensearch.test.OpenSearchIntegTestCase -import org.opensearch.test.rest.OpenSearchRestTestCase.entityAsMap -import java.io.IOException -import java.time.Duration -import java.time.Instant - -abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { - - @After - fun clearIndicesAfterEachTest() { - wipeAllIndices(getRestClient()) - } - - @Before - fun disableIndexStateManagementJitter() { - // jitter would add a test-breaking delay to the integration tests - updateIndexStateManagementJitterSetting(0.0) - } - - protected val isMixedNodeRegressionTest = System.getProperty("cluster.mixed", "false")!!.toBoolean() - - var metadataToClusterState = ManagedIndexMetaData( - index = "", - indexUuid = "", - policyID = "", - policySeqNo = 0, - policyPrimaryTerm = 1, - policyCompleted = false, - rolledOver = false, - indexCreationDate = null, - transitionTo = null, - stateMetaData = StateMetaData("ReplicaCountState", 1234), - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Happy moving") - ) - - override fun nodePlugins(): Collection> { - return listOf(IndexManagementPlugin::class.java) - } - - class TestPlugin : ActionPlugin, Plugin() { - override fun getActions(): List> { - return listOf( - ActionPlugin.ActionHandler( - UpdateManagedIndexMetaDataAction.INSTANCE, - TransportUpdateManagedIndexMetaDataAction::class.java - ), - ActionPlugin.ActionHandler(ExplainAction.INSTANCE, TransportExplainAction::class.java) - ) - } - } - - // TODO: ...convert into a test REST plugin that allows us to execute the transport action? -// override fun transportClientPlugins(): Collection> { -// return listOf(TestPlugin::class.java) -// } - - protected fun getIndexMetadata(indexName: String): IndexMetadata { - return client().admin().cluster().prepareState() - .setIndices(indexName) - .setMetadata(true).get() - .state.metadata.indices[indexName] - } - - // reuse utility fun from RestTestCase - fun createPolicy( - policy: Policy, - policyId: String = randomAlphaOfLength(10), - refresh: Boolean = true - ): Policy { - val response = createPolicyJson(policy.toJsonString(), policyId, refresh) - - val policyJson = JsonXContent.jsonXContent - .createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - response.entity.content - ).map() - val createdId = policyJson["_id"] as String - assertEquals("policy ids are not the same", policyId, createdId) - return policy.copy( - id = createdId, - seqNo = (policyJson["_seq_no"] as Int).toLong(), - primaryTerm = (policyJson["_primary_term"] as Int).toLong() - ) - } - - protected fun createPolicyJson( - policyString: String, - policyId: String, - refresh: Boolean = true - ): Response { - val response = getRestClient() - .makeRequest( - "PUT", - "${IndexManagementPlugin.POLICY_BASE_URI}/$policyId?refresh=$refresh", - emptyMap(), - StringEntity(policyString, ContentType.APPLICATION_JSON) - ) - assertEquals("Unable to create a new policy", RestStatus.CREATED, response.restStatus()) - return response - } - - protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode) - - protected fun addPolicyToIndex( - index: String, - policyID: String - ) { - val body = """ - { - "policy_id": "$policyID" - } - """.trimIndent() - val response = getRestClient() - .makeRequest("POST", "/_opendistro/_ism/add/$index", StringEntity(body, ContentType.APPLICATION_JSON)) - assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) - } - - protected fun getExistingManagedIndexConfig(index: String): ManagedIndexConfig { - return waitFor { - val config = getManagedIndexConfig(index) - assertNotNull("ManagedIndexConfig is null", config) - config!! - } - } - - protected fun getManagedIndexConfig(index: String): ManagedIndexConfig? { - val request = """ - { - "seq_no_primary_term": true, - "query": { - "term": { - "${ManagedIndexConfig.MANAGED_INDEX_TYPE}.${ManagedIndexConfig.INDEX_FIELD}": "$index" - } - } - } - """.trimIndent() - val response = getRestClient().makeRequest( - "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_search", emptyMap(), - StringEntity(request, ContentType.APPLICATION_JSON) - ) - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - val searchResponse = - SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, response.entity.content)) - assertTrue("Found more than one managed index config", searchResponse.hits.hits.size < 2) - val hit = searchResponse.hits.hits.firstOrNull() - return hit?.run { - val xcp = createParser(JsonXContent.jsonXContent, this.sourceRef) - xcp.parseWithType(id, seqNo, primaryTerm, ManagedIndexConfig.Companion::parse) - } - } - - protected fun updateManagedIndexConfigStartTime(update: ManagedIndexConfig, desiredStartTimeMillis: Long? = null) { - val intervalSchedule = (update.jobSchedule as IntervalSchedule) - val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis - val response = getRestClient().makeRequest( - "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}", - StringEntity( - "{\"doc\":{\"managed_index\":{\"schedule\":{\"interval\":{\"start_time\":" + - "\"$startTimeMillis\"}}}}}", - ContentType.APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - - protected fun updateManagedIndexConfigPolicy(update: ManagedIndexConfig, policy: Policy) { - val policyJsonString = policy.toJsonString() - logger.info("policy string: $policyJsonString") - var response = getRestClient().makeRequest( - "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}", - StringEntity( - "{\"doc\":{\"managed_index\": $policyJsonString }}", - ContentType.APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - - response = getRestClient().makeRequest( - "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}", - StringEntity( - "{\"doc\":{\"managed_index\": {\"policy_seq_no\": \"0\", \"policy_primary_term\": \"1\"} }}", - ContentType.APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - - @Suppress("UNCHECKED_CAST") - protected fun getNumberOfReplicasSetting(indexName: String): Int { - val indexSettings = getIndexSettings(indexName) as Map>> - return (indexSettings[indexName]!!["settings"]!!["index.number_of_replicas"] as String).toInt() - } - - @Throws(IOException::class) - protected open fun getIndexSettings(index: String): Map? { - val request = Request("GET", "/$index/_settings") - request.addParameter("flat_settings", "true") - val response = getRestClient().performRequest(request) - response.entity.content.use { `is` -> - return XContentHelper.convertToMap( - XContentType.JSON.xContent(), - `is`, - true - ) - } - } - - protected fun getExplainManagedIndexMetaData(indexName: String): ManagedIndexMetaData { - if (indexName.contains("*") || indexName.contains(",")) { - throw IllegalArgumentException("This method is only for a single concrete index") - } - - val response = getRestClient().makeRequest( - RestRequest.Method.GET.toString(), - "${RestExplainAction.EXPLAIN_BASE_URI}/$indexName" - ) - assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) - - lateinit var metadata: ManagedIndexMetaData - val xcp = createParser(XContentType.JSON.xContent(), response.entity.content) - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, - xcp.nextToken(), - xcp - ) - while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { - xcp.currentName() - xcp.nextToken() - if (xcp.currentName() == TOTAL_MANAGED_INDICES) xcp.intValue() - else metadata = ManagedIndexMetaData.parse(xcp) - } - return metadata - } - - protected fun assertIndexExists(index: String) { - val response = getRestClient().makeRequest("HEAD", index) - assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus()) - } - - fun getShardSegmentStats(index: String): Map { - val response = getRestClient().makeRequest("GET", "/$index/_stats/segments?level=shards") - - assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) - - return response.asMap() - } - - fun catIndexShard(index: String): List { - val response = getRestClient().makeRequest("GET", "_cat/shards/$index?format=json") - - assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) - - try { - return JsonXContent.jsonXContent - .createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - response.entity.content - ) - .use { parser -> parser.list() } - } catch (e: IOException) { - throw OpenSearchParseException("Failed to parse content to list", e) - } - } - - fun Response.asMap(): Map = entityAsMap(this) - - fun rerouteShard(configIndexName: String, fromNode: String, toNode: String) { - logger.info("Reallocating Shard. From Node: $fromNode To Node: $toNode ") - val moveCommand = MoveAllocationCommand(configIndexName, 0, fromNode, toNode) - val rerouteResponse = client().admin().cluster() - .reroute(ClusterRerouteRequest().add(moveCommand)).actionGet() - logger.info("reroute success? ${rerouteResponse.isAcknowledged}") - } - - fun updateIndexSettings(index: String, settings: Settings) { - val request = Request("PUT", "/$index/_settings") - request.setJsonEntity(Strings.toString(XContentType.JSON, settings)) - getRestClient().performRequest(request) - } - - fun updateClusterSetting(key: String, value: String?, escapeValue: Boolean = true) { - val formattedValue = if (escapeValue) "\"$value\"" else value - val request = """ - { - "persistent": { - "$key": $formattedValue - } - } - """.trimIndent() - val res = getRestClient().makeRequest( - "PUT", "_cluster/settings", emptyMap(), - StringEntity(request, ContentType.APPLICATION_JSON) - ) - assertEquals("Request failed", RestStatus.OK, res.restStatus()) - } - - protected fun updateIndexStateManagementJitterSetting(value: Double?) { - updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false) - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt deleted file mode 100644 index e18aa6c33..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement - -import com.carrotsearch.randomizedtesting.RandomizedTest.sleep -import org.junit.After -import org.junit.Assume -import org.junit.Before -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.common.settings.Settings -import org.opensearch.index.Index -import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction.Companion.METADATA_CORRUPT_WARNING -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction.Companion.METADATA_MOVING_WARNING -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale -import kotlin.collections.HashMap - -class MetadataRegressionIT : IndexStateManagementIntegTestCase() { - - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - @Before - fun startMetadataService() { - // metadata service could be stopped before following tests start run - // this will enable metadata service again - updateClusterSetting(ManagedIndexSettings.METADATA_SERVICE_STATUS.key, "-1") - updateClusterSetting(ManagedIndexSettings.METADATA_SERVICE_STATUS.key, "0") - } - - @After - fun cleanClusterSetting() { - // need to clean up otherwise will throw error - updateClusterSetting(ManagedIndexSettings.METADATA_SERVICE_STATUS.key, null, false) - updateClusterSetting(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL.key, null, false) - updateIndexStateManagementJitterSetting(null) - } - - fun `test move metadata service`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = ReplicaCountAction(10, 0) - val states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName) - - // create a job - addPolicyToIndex(indexName, policyID) - - // put some metadata into cluster state - var indexMetadata = getIndexMetadata(indexName) - metadataToClusterState = metadataToClusterState.copy( - index = indexName, - indexUuid = indexMetadata.indexUUID, - policyID = policyID - ) - val request = UpdateManagedIndexMetaDataRequest( - indicesToAddManagedIndexMetaDataTo = listOf( - Pair(Index(metadataToClusterState.index, metadataToClusterState.indexUuid), metadataToClusterState) - ) - ) - val response: AcknowledgedResponse = client().execute( - UpdateManagedIndexMetaDataAction.INSTANCE, request - ).get() - logger.info(response.isAcknowledged) - indexMetadata = getIndexMetadata(indexName) - logger.info("check if metadata is saved in cluster state: ${indexMetadata.getCustomData("managed_index_metadata")}") - - // TODO increase wait time since flaky seeing here. After looking through the log - // it's more likely a test framework execution lag. - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - METADATA_MOVING_WARNING, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - waitFor(Instant.ofEpochSecond(120)) { - assertEquals( - "Happy moving", - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata")) - } - - logger.info("metadata has moved") - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the start time so the job will trigger in 2 seconds, since there is metadata and policy with the index there is no initialization - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - waitFor { - assertEquals( - "Index did not set number_of_replicas to ${actionConfig.numOfReplicas}", - actionConfig.numOfReplicas, - getNumberOfReplicasSetting(indexName) - ) - } - } - - fun `test job can continue run from cluster state metadata`() { - /** - * new version of ISM plugin can handle metadata in cluster state - * when job already started - * - * create index, add policy to it - * manually add policy field to managed-index so runner won't do initialisation itself - * add metadata into cluster state - * then check if we can continue run from this added metadata - */ - - val indexName = "${testIndexName}_index_2" - val policyID = "${testIndexName}_testPolicyName_2" - val actionConfig = ReplicaCountAction(10, 0) - val states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName) - addPolicyToIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // manually add policy field into managed-index - updateManagedIndexConfigPolicy(managedIndexConfig, policy) - logger.info("managed-index: ${getExistingManagedIndexConfig(indexName)}") - - // manually save metadata into cluster state - var indexMetadata = getIndexMetadata(indexName) - metadataToClusterState = metadataToClusterState.copy( - index = indexName, - indexUuid = indexMetadata.indexUUID, - policyID = policyID - ) - val request = UpdateManagedIndexMetaDataRequest( - indicesToAddManagedIndexMetaDataTo = listOf( - Pair(Index(metadataToClusterState.index, metadataToClusterState.indexUuid), metadataToClusterState) - ) - ) - val response: AcknowledgedResponse = client().execute( - UpdateManagedIndexMetaDataAction.INSTANCE, request - ).get() - - logger.info(response.isAcknowledged) - indexMetadata = getIndexMetadata(indexName) - logger.info("check if metadata is saved in cluster state: ${indexMetadata.getCustomData("managed_index_metadata")}") - - waitFor { - assertEquals( - METADATA_MOVING_WARNING, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - waitFor(Instant.ofEpochSecond(120)) { - assertEquals( - "Happy moving", - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata")) - } - - logger.info("metadata has moved") - - // start the job run - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - assertEquals( - "Index did not set number_of_replicas to ${actionConfig.numOfReplicas}", - actionConfig.numOfReplicas, - getNumberOfReplicasSetting(indexName) - ) - } - } - - fun `test clean corrupt metadata`() { - val indexName = "${testIndexName}_index_3" - val policyID = "${testIndexName}_testPolicyName_3" - val action = ReplicaCountAction(10, 0) - val states = listOf(State(name = "ReplicaCountState", actions = listOf(action), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName) - - // create a job - addPolicyToIndex(indexName, policyID) - - // put some metadata into cluster state - val indexMetadata = getIndexMetadata(indexName) - metadataToClusterState = metadataToClusterState.copy( - index = indexName, - indexUuid = "randomindexuuid", - policyID = policyID - ) - val request = UpdateManagedIndexMetaDataRequest( - indicesToAddManagedIndexMetaDataTo = listOf( - Pair(Index(indexName, indexMetadata.indexUUID), metadataToClusterState) - ) - ) - client().execute(UpdateManagedIndexMetaDataAction.INSTANCE, request).get() - logger.info("check if metadata is saved in cluster state: ${getIndexMetadata(indexName).getCustomData("managed_index_metadata")}") - - waitFor { - assertEquals( - METADATA_CORRUPT_WARNING, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - waitFor(Instant.ofEpochSecond(120)) { - assertEquals(null, getExplainManagedIndexMetaData(indexName).info?.get("message")) - assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata")) - } - - logger.info("corrupt metadata has been cleaned") - } - - fun `test new node skip execution when old node exist in cluster`() { - Assume.assumeTrue(isMixedNodeRegressionTest) - - /** - * mixedCluster-0 is new node, mixedCluster-1 is old node - * - * set config index to only have one shard on new node - * so old node cannot run job because it has no shard - * new node also cannot run job because there is an old node - * here we check no job can be run - * - * then reroute shard to old node and this old node can run job - */ - - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = ReplicaCountAction(10, 0) - val states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName) - - val settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .build() - updateIndexSettings(INDEX_MANAGEMENT_INDEX, settings) - - // check config index shard position - val shardsResponse = catIndexShard(INDEX_MANAGEMENT_INDEX) - logger.info("check config index shard: $shardsResponse") - val shardNode = (shardsResponse[0] as HashMap<*, *>)["node"] - - sleep(3000) // wait some time for cluster to be stable - - // move shard on node1 to node0 if exist - if (shardNode == "mixedCluster-1") rerouteShard(INDEX_MANAGEMENT_INDEX, "mixedCluster-1", "mixedCluster-0") - - addPolicyToIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - updateManagedIndexConfigStartTime(managedIndexConfig) - - // check no job has been run - wait { assertEquals(null, getExistingManagedIndexConfig(indexName).policy) } - - // reroute shard to node1 - rerouteShard(INDEX_MANAGEMENT_INDEX, "mixedCluster-0", "mixedCluster-1") - - val shardsResponse2 = catIndexShard(INDEX_MANAGEMENT_INDEX) - logger.info("check config index shard: $shardsResponse2") - - // job can be ran now - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt deleted file mode 100644 index ecbfda5f4..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement - -import com.nhaarman.mockitokotlin2.any -import com.nhaarman.mockitokotlin2.doAnswer -import com.nhaarman.mockitokotlin2.doReturn -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.never -import com.nhaarman.mockitokotlin2.verify -import com.nhaarman.mockitokotlin2.whenever -import kotlinx.coroutines.runBlocking -import org.junit.Before -import org.opensearch.action.ActionListener -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse -import org.opensearch.client.AdminClient -import org.opensearch.client.Client -import org.opensearch.client.ClusterAdminClient -import org.opensearch.cluster.ClusterState -import org.opensearch.cluster.metadata.Metadata -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.collect.ImmutableOpenMap -import org.opensearch.indexmanagement.IndexManagementIndices -import org.opensearch.test.OpenSearchTestCase -import kotlin.test.assertFailsWith - -class MetadataServiceTests : OpenSearchTestCase() { - - private val clusterService: ClusterService = mock() - private val clusterState: ClusterState = mock() - private val metadata: Metadata = mock() - private val imIndices: IndexManagementIndices = mock() - - private val ex = Exception() - - @Before - fun setup() { - whenever(clusterService.state()).doReturn(clusterState) - whenever(clusterState.metadata).doReturn(metadata) - whenever(metadata.indices).doReturn(ImmutableOpenMap.of()) - } - - fun `test config index not exists`() = runBlocking { - whenever(imIndices.indexManagementIndexExists()).doReturn(false) - - val client = getClient( - getAdminClient( - getClusterAdminClient( - updateSettingResponse = null, - updateSettingException = ex - ) - ) - ) - val skipFlag = SkipExecution(client) - val metadataService = MetadataService(client, clusterService, skipFlag, imIndices) - metadataService.moveMetadata() - - verify(client.admin().cluster(), never()).updateSettings(any(), any()) - assertEquals(metadataService.finishFlag, true) - } - - // If update setting to 1 failed with some exception, runTimeCounter shouldn't be increased - fun `test failed to update setting to 1`() = runBlocking { - whenever(imIndices.indexManagementIndexExists()).doReturn(true) - - val client = getClient( - getAdminClient( - getClusterAdminClient( - updateSettingResponse = null, - updateSettingException = ex - ) - ) - ) - - val skipFlag = SkipExecution(client) - val metadataService = MetadataService(client, clusterService, skipFlag, imIndices) - metadataService.moveMetadata() - assertEquals(metadataService.runTimeCounter, 2) - metadataService.moveMetadata() - assertEquals(metadataService.runTimeCounter, 3) - metadataService.moveMetadata() - assertEquals(metadataService.runTimeCounter, 4) - assertFailsWith(Exception::class) { - runBlocking { - metadataService.moveMetadata() - } - } - assertEquals(metadataService.runTimeCounter, 4) - assertEquals(metadataService.finishFlag, false) - } - - private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } - - private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient } - - private fun getClusterAdminClient( - updateSettingResponse: ClusterUpdateSettingsResponse?, - updateSettingException: Exception? - ): ClusterAdminClient { - assertTrue( - "Must provide either a getMappingsResponse or getMappingsException", - (updateSettingResponse != null).xor(updateSettingException != null) - ) - - return mock { - doAnswer { invocationOnMock -> - val listener = invocationOnMock.getArgument>(1) - if (updateSettingResponse != null) listener.onResponse(updateSettingResponse) - else listener.onFailure(updateSettingException) - }.whenever(this.mock).updateSettings(any(), any()) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index b94eedeae..d46d5318a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -395,6 +395,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { } fun `test history shard settings`() { + deleteIndex(IndexManagementIndices.HISTORY_ALL) val indexName = "${testIndexName}_shard_settings" val policyID = "${testIndexName}_shard_settings_1" val actionConfig = ReadOnlyAction(0) @@ -429,7 +430,8 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { waitFor { assertIndexExists(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS) val indexSettings = getIndexSettings(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS) - val historyIndexName = indexSettings.keys.filter { it.startsWith(IndexManagementIndices.HISTORY_INDEX_BASE) }.firstOrNull() + val historyIndexName = + indexSettings.keys.firstOrNull { it.startsWith(IndexManagementIndices.HISTORY_INDEX_BASE) } assertNotNull("Could not find a concrete history index", historyIndexName) assertEquals("Wrong number of shards", 2, getNumberOfShardsSetting(historyIndexName!!)) assertEquals("Wrong number of replicas", 3, getNumberOfReplicasSetting(historyIndexName)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index f455fca15..59464d152 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -19,9 +19,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator -import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase import org.opensearch.threadpool.Scheduler @@ -36,8 +34,6 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { private lateinit var settings: Settings private lateinit var indexManagementIndices: IndexManagementIndices - private lateinit var metadataService: MetadataService - private lateinit var templateService: ISMTemplateService private lateinit var coordinator: ManagedIndexCoordinator private lateinit var indexMetadataProvider: IndexMetadataProvider @@ -49,8 +45,6 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { client = Mockito.mock(Client::class.java) threadPool = Mockito.mock(ThreadPool::class.java) indexManagementIndices = Mockito.mock(IndexManagementIndices::class.java) - metadataService = Mockito.mock(MetadataService::class.java) - templateService = Mockito.mock(ISMTemplateService::class.java) val namedXContentRegistryEntries = arrayListOf() xContentRegistry = NamedXContentRegistry(namedXContentRegistryEntries) @@ -76,14 +70,13 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { clusterService = Mockito.spy(originClusterService) indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) coordinator = ManagedIndexCoordinator( - settings, client, clusterService, threadPool, indexManagementIndices, metadataService, - templateService, indexMetadataProvider + settings, client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider ) } fun `test after start`() { coordinator.afterStart() - Mockito.verify(threadPool, Mockito.times(2)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString()) + Mockito.verify(threadPool, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString()) } fun `test before stop`() { @@ -100,7 +93,7 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { fun `test on cluster manager`() { coordinator.onClusterManager() - Mockito.verify(threadPool, Mockito.times(3)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString()) + Mockito.verify(threadPool, Mockito.times(1)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString()) } fun `test off cluster manager`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt deleted file mode 100644 index b082352fe..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.migration - -import org.junit.Assume -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy -import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.waitFor -import java.time.Instant - -class MigrationServicesIT : IndexStateManagementRestTestCase() { - fun `test v1 index templates different order migration`() { - val policyID1 = "p1" - val policyID2 = "p2" - createPolicy(randomPolicy(), policyID1) - createPolicy(randomPolicy(), policyID2) - createV1Template("t1", "a*", policyID1, order = -1) - createV1Template("t2", "ab*", policyID1) - createV1Template("t3", "ab*", policyID2, order = 1) - createV1Template2("t4", "ab*", order = 0) - enableISMTemplateMigration() - - waitFor(Instant.ofEpochSecond(80)) { - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.indexPatterns.toString(), "[ab*]") - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.priority, 2) - } - - // 1s interval to let the ism_template becomes searchable so that coordinator - // can pick it up - Thread.sleep(1_000) - // need to delete overlapping template, otherwise warning will fail the test - deleteV1Template("t1") - deleteV1Template("t2") - deleteV1Template("t3") - deleteV1Template("t4") - - val indexName = "ab_index" - createIndex(indexName, policyID = null) - waitFor { - assertPredicatesOnMetaData( - listOf( - indexName to listOf( - ManagedIndexSettings.POLICY_ID.key to policyID2::equals, - LegacyOpenDistroManagedIndexSettings.POLICY_ID.key to policyID2::equals - ) - ), - getExplainMap(indexName), false - ) - } - } - - fun `test v1 index templates migration`() { - // cat/templates API could return template info in different order in multi-node test - // so skip for multi-node test - Assume.assumeFalse(isMultiNode) - - val policyID1 = "p1" - val policyID2 = "p2" - createPolicy(randomPolicy(), policyID1) - createPolicy(randomPolicy(), policyID2) - createV1Template("t1", "a*", policyID1) - createV1Template("t2", "ab*", policyID2) - enableISMTemplateMigration() - - // cat templates, check t1 t2 order - val order = getTemplatesOrder() - - // t1, t2 - if (order == listOf("t1", "t2")) { - waitFor(Instant.ofEpochSecond(80)) { - assertEquals(getPolicy(policyID1).ismTemplate?.first()?.indexPatterns.toString(), "[a*]") - assertEquals(getPolicy(policyID1).ismTemplate?.first()?.priority, 1) - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.indexPatterns.toString(), "[ab*]") - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.priority, 0) - } - } - - // t2, t1 - if (order == listOf("t2", "t1")) { - waitFor(Instant.ofEpochSecond(80)) { - waitFor(Instant.ofEpochSecond(80)) { - assertEquals(getPolicy(policyID1).ismTemplate?.first()?.indexPatterns.toString(), "[a*]") - assertEquals(getPolicy(policyID1).ismTemplate?.first()?.priority, 0) - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.indexPatterns.toString(), "[ab*]") - assertEquals(getPolicy(policyID2).ismTemplate?.first()?.priority, 1) - } - } - } - - // 1s interval to let the ism_template becomes searchable so that coordinator - // can pick it up - Thread.sleep(1_000) - deleteV1Template("t1") - deleteV1Template("t2") - - if (order == listOf("t1", "t2")) { - val indexName = "ab_index" - createIndex(indexName, policyID = null) - waitFor { - assertPredicatesOnMetaData( - listOf( - indexName to listOf( - ManagedIndexSettings.POLICY_ID.key to policyID1::equals, - LegacyOpenDistroManagedIndexSettings.POLICY_ID.key to policyID1::equals - ) - ), - getExplainMap(indexName), false - ) - } - } - - if (order == listOf("t2", "t1")) { - val indexName = "ab_index" - createIndex(indexName, policyID = null) - waitFor { - assertPredicatesOnMetaData( - listOf( - indexName to listOf( - ManagedIndexSettings.POLICY_ID.key to policyID2::equals, - LegacyOpenDistroManagedIndexSettings.POLICY_ID.key to policyID2::equals - ) - ), - getExplainMap(indexName), false - ) - } - } - } - - @Suppress("UNCHECKED_CAST") - private fun getTemplatesOrder(): List { - val order = catIndexTemplates().map { - val row = it as Map - row["name"] - } - return order - } - - fun `test v2 index templates migration`() { - val policyID1 = "p1" - createPolicy(randomPolicy(), policyID1) - createV2Template("t1", "a*", policyID1) - enableISMTemplateMigration() - - waitFor(Instant.ofEpochSecond(80)) { - assertEquals(getPolicy(policyID1).ismTemplate?.first()?.indexPatterns.toString(), "[a*]") - } - - // 1s interval to let the ism_template becomes searchable so that coordinator - // can pick it up - Thread.sleep(1_000) - deleteV2Template("t1") - - val indexName = "ab_index" - createIndex(indexName, policyID = null) - waitFor { - assertPredicatesOnMetaData( - listOf( - indexName to listOf( - ManagedIndexSettings.POLICY_ID.key to policyID1::equals, - LegacyOpenDistroManagedIndexSettings.POLICY_ID.key to policyID1::equals - ) - ), - getExplainMap(indexName), false - ) - } - } - - private fun enableISMTemplateMigration() { - updateClusterSetting(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL.key, "-1") - updateClusterSetting(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL.key, "0") - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt index aeeedd7c9..3f3114f10 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -23,7 +23,6 @@ import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.Metadata import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.collect.ImmutableOpenMap import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.index.shard.DocsStats @@ -50,7 +49,7 @@ class AttemptTransitionStepTests : OpenSearchTestCase() { private val indexUUID: String = "indexUuid" @Suppress("UNCHECKED_CAST") private val indexMetadata: IndexMetadata = mock { - on { rolloverInfos } doReturn ImmutableOpenMap.builder().build() + on { rolloverInfos } doReturn mapOf() on { indexUUID } doReturn indexUUID } private val metadata: Metadata = mock { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperServiceTests.kt index f606e207b..5d8004107 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperServiceTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperServiceTests.kt @@ -21,7 +21,6 @@ import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.collect.ImmutableOpenMap import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult import org.opensearch.test.OpenSearchTestCase @@ -292,16 +291,14 @@ class RollupMapperServiceTests : OpenSearchTestCase() { private fun getMappingResponse(indexName: String, emptyMapping: Boolean = false): GetMappingsResponse { val mappings = if (emptyMapping) { - ImmutableOpenMap.Builder().build() + mapOf() } else { val mappingSourceMap = createParser( XContentType.JSON.xContent(), javaClass.classLoader.getResource("mappings/kibana-sample-data.json").readText() ).map() val mappingMetadata = MappingMetadata("_doc", mappingSourceMap) // it seems it still expects a type, i.e. _doc now - ImmutableOpenMap.Builder() - .fPut(indexName, mappingMetadata) - .build() + mapOf(indexName to mappingMetadata) } return GetMappingsResponse(mappings) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt index b4bebcf11..5a3c016d2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt @@ -14,7 +14,6 @@ import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.opensearch.action.index.IndexResponse import org.opensearch.cluster.SnapshotsInProgress import org.opensearch.common.UUIDs -import org.opensearch.common.collect.ImmutableOpenMap import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.core.xcontent.NamedXContentRegistry @@ -242,7 +241,7 @@ fun mockInProgressSnapshotInfo( emptyList(), randomNonNegativeLong(), randomNonNegativeLong(), - ImmutableOpenMap.of(), + mapOf(), "", mapOf("sm_policy" to "daily-snapshot"), Version.CURRENT,