Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
class ConvertIndexToRemoteAction(
val repository: String,
val snapshot: String,
val includeAliases: Boolean = false,
val ignoreIndexSettings: String = "",
val numberOfReplicas: Int = 0,
index: Int,
) : Action(name, index) {

companion object {
const val name = "convert_index_to_remote"
const val REPOSITORY_FIELD = "repository"
const val SNAPSHOT_FIELD = "snapshot"
const val INCLUDE_ALIASES_FIELD = "include_aliases"
const val IGNORE_INDEX_SETTINGS_FIELD = "ignore_index_settings"
const val NUMBER_OF_REPLICAS_FIELD = "number_of_replicas"
}

private val attemptRestoreStep = AttemptRestoreStep(this)
Expand All @@ -37,12 +43,18 @@ class ConvertIndexToRemoteAction(
builder.startObject(type)
builder.field(REPOSITORY_FIELD, repository)
builder.field(SNAPSHOT_FIELD, snapshot)
builder.field(INCLUDE_ALIASES_FIELD, includeAliases)
builder.field(IGNORE_INDEX_SETTINGS_FIELD, ignoreIndexSettings)
builder.field(NUMBER_OF_REPLICAS_FIELD, numberOfReplicas)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeString(repository)
out.writeString(snapshot)
out.writeBoolean(includeAliases)
out.writeString(ignoreIndexSettings)
out.writeInt(numberOfReplicas)
Comment on lines +55 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a version check here to maintain BWC during rolling upgrades.

out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.IGNORE_INDEX_SETTINGS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.INCLUDE_ALIASES_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.NUMBER_OF_REPLICAS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.REPOSITORY_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction.Companion.SNAPSHOT_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand All @@ -18,13 +21,19 @@ class ConvertIndexToRemoteActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val repository = sin.readString()
val snapshot = sin.readString()
val includeAliases = sin.readBoolean()
val ignoreIndexSettings = sin.readString()
val numberOfReplicas = sin.readInt()
Comment on lines +24 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we need a version check here to maintain BWC during rolling upgrades.

val index = sin.readInt()
return ConvertIndexToRemoteAction(repository, snapshot, index)
return ConvertIndexToRemoteAction(repository, snapshot, includeAliases, ignoreIndexSettings, numberOfReplicas, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var repository: String? = null
var snapshot: String? = null
var includeAliases: Boolean = false
var ignoreIndexSettings: String = ""
var numberOfReplicas: Int = 0

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -34,13 +43,19 @@ class ConvertIndexToRemoteActionParser : ActionParser() {
when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
SNAPSHOT_FIELD -> snapshot = xcp.text()
INCLUDE_ALIASES_FIELD -> includeAliases = xcp.booleanValue()
IGNORE_INDEX_SETTINGS_FIELD -> ignoreIndexSettings = xcp.text()
NUMBER_OF_REPLICAS_FIELD -> numberOfReplicas = xcp.intValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ConvertIndexToRemoteAction.")
}
}

return ConvertIndexToRemoteAction(
repository = requireNotNull(repository) { "ConvertIndexToRemoteAction repository must be specified" },
snapshot = requireNotNull(snapshot) { "ConvertIndexToRemoteAction snapshot must be specified" },
includeAliases = includeAliases,
ignoreIndexSettings = ignoreIndexSettings,
numberOfReplicas = numberOfReplicas,
index = index,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@ import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
import org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS
import org.opensearch.common.settings.Settings
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.action.ConvertIndexToRemoteAction
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
Expand All @@ -34,7 +41,7 @@ class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod")
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount", "LongMethod", "NestedBlockDepth")
override suspend fun execute(): Step {
val context = this.context ?: return this
val managedIndexMetadata = context.metadata
Expand All @@ -44,7 +51,7 @@ class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(
val snapshot = action.snapshot

try {
val mutableInfo = mutableMapOf<String, String>()
val mutableInfo = mutableMapOf<String, Any>()
val snapshotScript = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshot, mapOf())
val defaultSnapshotPattern = snapshot.ifBlank { indexName }
val snapshotPattern = compileTemplate(snapshotScript, managedIndexMetadata, defaultSnapshotPattern, scriptService)
Expand Down Expand Up @@ -86,29 +93,17 @@ class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(
// Use the snapshot name from the selected SnapshotInfo
snapshotName = latestSnapshotInfo.snapshotId().name

// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices(indexName)
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}
val remoteIndexName = "${indexName}_remote"

// Check if remote index exists
val remoteIndexExists = checkRemoteIndexExists(context, remoteIndexName)

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
if (remoteIndexExists) {
// Restore completed, mark as completed
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = getSuccessMessage(indexName)
} else {
performRestore(context, indexName, repository, snapshotName, mutableInfo)
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
Expand All @@ -127,6 +122,87 @@ class AttemptRestoreStep(private val action: ConvertIndexToRemoteAction) : Step(
return this
}

private suspend fun checkRemoteIndexExists(context: StepContext, remoteIndexName: String): Boolean =
try {
val existsResponse: IndicesExistsResponse = context.client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(remoteIndexName), it)
}
existsResponse.isExists
} catch (e: Exception) {
// Index doesn't exist yet
false
}

private suspend fun performRestore(
context: StepContext,
indexName: String,
repository: String,
snapshotName: String?,
mutableInfo: MutableMap<String, Any>,
) {
val remoteIndexName = "${indexName}_remote"
// Proceed with the restore operation
val restoreSnapshotRequest = RestoreSnapshotRequest(repository, snapshotName)
.indices(indexName)
.storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.renamePattern("^(.*)\$")
.renameReplacement("$1_remote")
.waitForCompletion(false)
.includeAliases(action.includeAliases)
.ignoreIndexSettings(action.ignoreIndexSettings)

// Set number_of_replicas (defaults to 0 if not specified)
val indexSettings = Settings.builder()
.put(SETTING_NUMBER_OF_REPLICAS, action.numberOfReplicas)
.build()
restoreSnapshotRequest.indexSettings(indexSettings)

val response: RestoreSnapshotResponse = context.client.admin().cluster().suspendUntil {
restoreSnapshot(restoreSnapshotRequest, it)
}

when (response.status()) {
RestStatus.ACCEPTED, RestStatus.OK -> {
deleteOriginalIndex(context, indexName, mutableInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the snapshot restore is not waiting for completion, I worry delete index here would disrupt the search.

I see you mentioned this:

the action automatically deletes the original index once the restore operation is accepted (status: ACCEPTED or OK), ensuring a clean conversion process where only the remote index remains.

But I think we should keep this an option for user, and disabled by default.

// Mark as waiting for completion
stepStatus = StepStatus.CONDITION_NOT_MET
mutableInfo["message"] = "Waiting for remote index [$remoteIndexName] to be created"
logger.info("Restore accepted for snapshot [$snapshotName], waiting for remote index [$remoteIndexName] to be created")
}
else -> {
val message = getFailedMessage(indexName, "Unexpected response status: ${response.status()}")
logger.warn("$message - $response")
stepStatus = StepStatus.FAILED
mutableInfo["message"] = message
mutableInfo["cause"] = response.toString()
}
}
}

private suspend fun deleteOriginalIndex(
context: StepContext,
indexName: String,
mutableInfo: MutableMap<String, Any>,
) {
// Restore accepted, delete original index
try {
val deleteResponse: AcknowledgedResponse = context.client.admin().indices().suspendUntil {
delete(DeleteIndexRequest(indexName), it)
}
if (deleteResponse.isAcknowledged) {
logger.info("Successfully deleted original index [$indexName] after restore was accepted")
mutableInfo["deleted_original_index"] = true
} else {
logger.warn("Delete request for original index [$indexName] was not acknowledged")
mutableInfo["deleted_original_index"] = false
}
} catch (e: Exception) {
logger.warn("Failed to delete original index [$indexName] after restore was accepted: ${e.message}", e)
mutableInfo["deleted_original_index"] = false
mutableInfo["delete_error"] = e.message ?: "Unknown error"
}
}

private fun compileTemplate(
template: Script,
managedIndexMetaData: ManagedIndexMetaData,
Expand Down
11 changes: 10 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 24
"schema_version": 25
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -226,6 +226,15 @@
},
"snapshot": {
"type": "keyword"
},
"include_aliases": {
"type": "boolean"
},
"ignore_index_settings": {
"type": "keyword"
},
"number_of_replicas": {
"type": "integer"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {
val configSchemaVersion = 24
val configSchemaVersion = 25
val historySchemaVersion = 7

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fun randomTemplateScript(

fun randomSnapshotActionConfig(repository: String = "repo", snapshot: String = "sp"): SnapshotAction = SnapshotAction(repository, snapshot, index = 0)

fun randomRestoreActionConfig(repository: String = "repo", snapshot: String = "sp"): ConvertIndexToRemoteAction = ConvertIndexToRemoteAction(repository, snapshot, index = 0)
fun randomRestoreActionConfig(repository: String = "repo", snapshot: String = "sp"): ConvertIndexToRemoteAction = ConvertIndexToRemoteAction(repository, snapshot, includeAliases = false, ignoreIndexSettings = "", numberOfReplicas = 0, index = 0)

/**
* Helper functions for creating a random Conditions object
Expand Down Expand Up @@ -471,6 +471,11 @@ fun AliasAction.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ConvertIndexToRemoteAction.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ISMTemplate.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Loading