diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt index 2523bb76f..eae951827 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt @@ -103,17 +103,20 @@ object SMRunner : // creation, deletion workflow have to be executed sequentially, // because they are sharing the same metadata document. - SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) .handlePolicyChange() - .currentState(metadata.creation.currentState) - .next(creationTransitions) - .apply { - val deleteMetadata = metadata.deletion - if (deleteMetadata != null) { - this.currentState(deleteMetadata.currentState) - .next(deletionTransitions) - } - } + + // Execute creation workflow if it exists + if (metadata.creation != null) { + stateMachine.currentState(metadata.creation.currentState) + .next(creationTransitions) + } + + // Execute deletion workflow if it exists + if (metadata.deletion != null) { + stateMachine.currentState(metadata.deletion.currentState) + .next(deletionTransitions) + } } finally { if (!releaseLockForScheduledJob(context, lock)) { log.error("Could not release lock [${lock.lockId}] for ${job.id}.") @@ -155,13 +158,14 @@ object SMRunner : id = smPolicyNameToMetadataDocId(smDocIdToPolicyName(job.id)), policySeqNo = job.seqNo, policyPrimaryTerm = job.primaryTerm, - creation = - SMMetadata.WorkflowMetadata( - SMState.CREATION_START, - SMMetadata.Trigger( - time = job.creation.schedule.getNextExecutionTime(now), - ), - ), + creation = job.creation?.let { + SMMetadata.WorkflowMetadata( + SMState.CREATION_START, + SMMetadata.Trigger( + time = job.creation.schedule.getNextExecutionTime(now), + ), + ) + }, deletion = job.deletion?.let { SMMetadata.WorkflowMetadata( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt index b17ec5493..9d346cd40 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt @@ -17,6 +17,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse +import org.opensearch.common.regex.Regex import org.opensearch.common.time.DateFormatter import org.opensearch.common.time.DateFormatters import org.opensearch.common.unit.TimeValue @@ -240,7 +241,7 @@ suspend fun Client.getSnapshots( snapshotMissingMsg: String?, exceptionMsg: String, ): GetSnapshotsResult { - val snapshots = + var snapshots = try { getSnapshots( name, @@ -257,7 +258,19 @@ suspend fun Client.getSnapshots( cause = ex, ) return GetSnapshotsResult(true, emptyList(), metadataBuilder) - }.filterBySMPolicyInSnapshotMetadata(job.policyName) + } + + // Parse CSV patterns from name and implement pattern-based filtering + val patterns = name.split(",").map { it.trim() } + val policyNamePattern = "${job.policyName}*" + // Filter other snapshots by policy metadata + val otherPatternSnapshots = snapshots.filter { snapshot -> + patterns.any { pattern -> + pattern != policyNamePattern && Regex.simpleMatch(pattern, snapshot.snapshotId().name) + } + } + val policySnapshots = snapshots.filterBySMPolicyInSnapshotMetadata(job.policyName) + snapshots = policySnapshots + otherPatternSnapshots return GetSnapshotsResult(false, snapshots, metadataBuilder) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index f72d7e9ad..db22f3e83 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -162,7 +162,7 @@ class SMStateMachine( val retry = when (result.workflowType) { WorkflowType.CREATION -> { - metadata.creation.retry + metadata.creation?.retry } WorkflowType.DELETION -> { metadata.deletion?.retry @@ -249,7 +249,11 @@ class SMStateMachine( val metadataToSave = SMMetadata.Builder(metadata) .setSeqNoPrimaryTerm(job.seqNo, job.primaryTerm) - .setNextCreationTime(job.creation.schedule.getNextExecutionTime(now)) + + val creation = job.creation + creation?.let { + metadataToSave.setNextCreationTime(creation.schedule.getNextExecutionTime(now)) + } val deletion = job.deletion deletion?.let { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt index 44e877588..a8b1c0942 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt @@ -38,7 +38,14 @@ object CreatingState : State { SMMetadata.Builder(metadata) .workflow(WorkflowType.CREATION) - var snapshotName: String? = metadata.creation.started?.first() + if (job.creation == null) { + log.warn("Policy creation config becomes null before trying to create snapshot. Reset.") + return SMResult.Fail( + metadataBuilder.resetCreation(), WorkflowType.CREATION, true, + ) + } + + var snapshotName: String? = metadata.creation?.started?.first() // Check if there's already a snapshot created by SM in current execution period. // So that this State can be executed idempotent. @@ -54,8 +61,8 @@ object CreatingState : State { } val getSnapshots = getSnapshotsResult.snapshots - val latestExecutionStartTime = job.creation.schedule.getPeriodStartingAt(null).v1() - snapshotName = checkCreatedSnapshots(latestExecutionStartTime, getSnapshots) + val latestExecutionStartTime = job.creation?.schedule?.getPeriodStartingAt(null)?.v1() + snapshotName = latestExecutionStartTime?.let { checkCreatedSnapshots(it, getSnapshots) } if (snapshotName != null) { log.info("Already created snapshot [$snapshotName] during this execution period starting at $latestExecutionStartTime.") metadataBuilder.setLatestExecution( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetState.kt index 34c5414d4..84ec21769 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetState.kt @@ -11,10 +11,12 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.snapshotmanagement.tryUpdatingNextExecutionTime +import java.time.Instant.now object CreationConditionMetState : State { override val continuous = true + @Suppress("ReturnCount") override suspend fun execute(context: SMStateMachine): SMResult { val job = context.job val metadata = context.metadata @@ -24,15 +26,35 @@ object CreationConditionMetState : State { SMMetadata.Builder(metadata) .workflow(WorkflowType.CREATION) - val nextCreationTime = metadata.creation.trigger.time - val updateNextTimeResult = - tryUpdatingNextExecutionTime( - metadataBuilder, nextCreationTime, job.creation.schedule, WorkflowType.CREATION, log, + if (job.creation == null) { + log.warn("Policy creation config becomes null before trying to create snapshot. Reset.") + return SMResult.Fail( + metadataBuilder.resetCreation(), WorkflowType.CREATION, true, ) - if (!updateNextTimeResult.updated) { - return SMResult.Stay(metadataBuilder) } - metadataBuilder = updateNextTimeResult.metadataBuilder + + // if job.creation != null, then metadata.creation.trigger.time should already be + // initialized or handled in handlePolicyChange before executing this state. + val nextCreationTime = if (metadata.creation == null) { + val nextTime = job.creation.schedule.getNextExecutionTime(now()) + nextTime?.let { metadataBuilder.setNextCreationTime(it) } + nextTime + } else { + metadata.creation.trigger.time + } + + nextCreationTime?.let { creationTime -> + job.creation.schedule.let { schedule -> + val updateNextTimeResult = + tryUpdatingNextExecutionTime( + metadataBuilder, creationTime, schedule, WorkflowType.CREATION, log, + ) + if (!updateNextTimeResult.updated) { + return SMResult.Stay(metadataBuilder) + } + metadataBuilder = updateNextTimeResult.metadataBuilder + } + } return SMResult.Next(metadataBuilder) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt index d9c41f197..11c27d24b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt @@ -21,7 +21,7 @@ import java.time.Instant.now object CreationFinishedState : State { override val continuous = true - @Suppress("ReturnCount", "LongMethod", "NestedBlockDepth") + @Suppress("ReturnCount", "LongMethod", "NestedBlockDepth", "CyclomaticComplexMethod") override suspend fun execute(context: SMStateMachine): SMResult { val client = context.client val job = context.job @@ -32,8 +32,8 @@ object CreationFinishedState : State { SMMetadata.Builder(metadata) .workflow(WorkflowType.CREATION) - metadata.creation.started?.first()?.let { snapshotName -> - if (metadata.creation.latestExecution == null) { + metadata.creation?.started?.first()?.let { snapshotName -> + if (metadata.creation?.latestExecution == null) { // This should not happen log.error("latest_execution is null while checking if snapshot [$snapshotName] creation has finished. Reset.") metadataBuilder.resetWorkflow() @@ -75,8 +75,8 @@ object CreationFinishedState : State { job.notificationConfig?.sendCreationNotification(client, job.policyName, creationMessage, job.user, log) } SnapshotState.IN_PROGRESS -> { - job.creation.timeLimit?.let { timeLimit -> - if (timeLimit.isExceed(metadata.creation.latestExecution.startTime)) { + job.creation?.timeLimit?.let { timeLimit -> + if (timeLimit.isExceed(metadata.creation?.latestExecution?.startTime)) { return timeLimitExceeded( timeLimit, metadataBuilder, WorkflowType.CREATION, log, ) @@ -98,18 +98,22 @@ object CreationFinishedState : State { // if now is after next creation time, update nextCreationTime to next execution schedule // TODO may want to notify user that we skipped the execution because snapshot creation time is longer than execution schedule - val result = - tryUpdatingNextExecutionTime( - metadataBuilder, metadata.creation.trigger.time, job.creation.schedule, - WorkflowType.CREATION, log, - ) - if (result.updated) { - metadataBuilder = result.metadataBuilder + metadata.creation?.trigger?.time?.let { triggerTime -> + job.creation?.schedule?.let { schedule -> + val result = + tryUpdatingNextExecutionTime( + metadataBuilder, triggerTime, schedule, + WorkflowType.CREATION, log, + ) + if (result.updated) { + metadataBuilder = result.metadataBuilder + } + } } } val metadataToSave = metadataBuilder.build() - if (metadataToSave.creation.started != null) { + if (metadataToSave.creation?.started != null) { return SMResult.Stay(metadataBuilder) } return SMResult.Next(metadataBuilder) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt index 59291f5c0..c2addaf81 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt @@ -49,9 +49,14 @@ object DeletingState : State { val snapshotsToDelete: List + var snapshotPattern = job.policyName + "*" + + if (job.deletion?.snapshotPattern != null) { + snapshotPattern += "," + job.deletion.snapshotPattern + } val getSnapshotsRes = client.getSnapshots( - job, job.policyName + "*", metadataBuilder, log, + job, snapshotPattern, metadataBuilder, log, getSnapshotsMissingMessage(), getSnapshotsErrorMessage(), ) @@ -59,11 +64,13 @@ object DeletingState : State { if (getSnapshotsRes.failed) { return SMResult.Fail(metadataBuilder, WorkflowType.DELETION) } - val getSnapshots = getSnapshotsRes.snapshots + val snapshots = getSnapshotsRes.snapshots + .distinctBy { it.snapshotId().name } + .filter { it.state() != SnapshotState.IN_PROGRESS } snapshotsToDelete = filterByDeleteCondition( - getSnapshots.filter { it.state() != SnapshotState.IN_PROGRESS }, + snapshots, job.deletion.condition, log, ) if (snapshotsToDelete.isNotEmpty()) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt index d36f318e9..a171403de 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt @@ -38,9 +38,14 @@ object DeletionFinishedState : State { return@let } + var snapshotPattern = job.policyName + "*" + + if (job.deletion?.snapshotPattern != null) { + snapshotPattern += "," + job.deletion.snapshotPattern + } val getSnapshotsRes = client.getSnapshots( - job, "${job.policyName}*", metadataBuilder, log, + job, snapshotPattern, metadataBuilder, log, getSnapshotMissingMessageInDeletionWorkflow(), getSnapshotExceptionInDeletionWorkflow(snapshotsStartedDeletion), ) @@ -48,9 +53,9 @@ object DeletionFinishedState : State { if (getSnapshotsRes.failed) { return SMResult.Fail(metadataBuilder, WorkflowType.DELETION) } - val getSnapshots = getSnapshotsRes.snapshots + val snapshots = getSnapshotsRes.snapshots.distinctBy { it.snapshotId().name } - val existingSnapshotsNameSet = getSnapshots.map { it.snapshotId().name }.toSet() + val existingSnapshotsNameSet = snapshots.map { it.snapshotId().name }.toSet() val remainingSnapshotsName = existingSnapshotsNameSet intersect snapshotsStartedDeletion.toSet() if (remainingSnapshotsName.isEmpty()) { val deletionMessage = "Snapshot(s) $snapshotsStartedDeletion deletion has finished." diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt index 761ae5fac..0d634566d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.model +import org.opensearch.Version import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -38,7 +39,7 @@ typealias InfoType = Map data class SMMetadata( val policySeqNo: Long, val policyPrimaryTerm: Long, - val creation: WorkflowMetadata, + val creation: WorkflowMetadata?, val deletion: WorkflowMetadata?, val id: String = NO_ID, val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -51,7 +52,7 @@ data class SMMetadata( builder.field(NAME_FIELD, smMetadataDocIdToPolicyName(id)) .field(POLICY_SEQ_NO_FIELD, policySeqNo) .field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm) - .field(CREATION_FIELD, creation) + .optionalField(CREATION_FIELD, creation) .optionalField(DELETION_FIELD, deletion) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() return builder.endObject() @@ -92,7 +93,7 @@ data class SMMetadata( return SMMetadata( policySeqNo = requireNotNull(policySeqNo) { "policy_seq_no field must not be null" }, policyPrimaryTerm = requireNotNull(policyPrimaryTerm) { "policy_primary_term field must not be null" }, - creation = requireNotNull(creation) { "creation field must not be null" }, + creation = creation, deletion = deletion, id = id, seqNo = seqNo, @@ -112,7 +113,12 @@ data class SMMetadata( constructor(sin: StreamInput) : this( policySeqNo = sin.readLong(), policyPrimaryTerm = sin.readLong(), - creation = WorkflowMetadata(sin), + creation = if (sin.version.onOrAfter(Version.V_3_3_0)) { + sin.readOptionalWriteable { WorkflowMetadata(it) } + } else { + // For older versions, creation will always exist + WorkflowMetadata(sin) + }, deletion = sin.readOptionalWriteable { WorkflowMetadata(it) }, id = sin.readString(), seqNo = sin.readLong(), @@ -122,7 +128,12 @@ data class SMMetadata( override fun writeTo(out: StreamOutput) { out.writeLong(policySeqNo) out.writeLong(policyPrimaryTerm) - creation.writeTo(out) + if (out.version.onOrAfter(Version.V_3_3_0)) { + out.writeOptionalWriteable(creation) + } else { + // For older versions, always creation will be present + requireNotNull(creation) { "creation must not be null for versions before 3.2.0" }.writeTo(out) + } out.writeOptionalWriteable(deletion) out.writeString(id) out.writeLong(seqNo) @@ -431,7 +442,7 @@ data class SMMetadata( metadata = metadata.copy( creation = - metadata.creation.copy( + metadata.creation?.copy( currentState = state, ), ) @@ -452,9 +463,9 @@ data class SMMetadata( // Reset the workflow of this execution period so SM can // go to execute the next fun resetWorkflow(): Builder { - var creationCurrentState = metadata.creation.currentState - var startedCreation = metadata.creation.started - var creationRetry = metadata.creation.retry + var creationCurrentState = metadata.creation?.currentState + var startedCreation = metadata.creation?.started + var creationRetry = metadata.creation?.retry var deletionCurrentState = metadata.deletion?.currentState var startedDeletion = metadata.deletion?.started var deletionRetry = metadata.deletion?.retry @@ -474,8 +485,8 @@ data class SMMetadata( metadata = metadata.copy( creation = - metadata.creation.copy( - currentState = creationCurrentState, + metadata.creation?.copy( + currentState = creationCurrentState!!, started = startedCreation, retry = creationRetry, ), @@ -499,6 +510,14 @@ data class SMMetadata( return this } + fun resetCreation(): Builder { + metadata = + metadata.copy( + creation = null, + ) + return this + } + // Use this **first** to update metadata, because it depends on started field // So if you change started first, this could behave wrongly @Suppress("LongParameterList") @@ -536,8 +555,8 @@ data class SMMetadata( status = status, info = Info( - message = if (updateMessage) messageWithTime else metadata.creation.latestExecution?.info?.message, - cause = if (updateCause) causeWithTime else metadata.creation.latestExecution?.info?.cause, + message = if (updateMessage) messageWithTime else metadata.creation?.latestExecution?.info?.message, + cause = if (updateCause) causeWithTime else metadata.creation?.latestExecution?.info?.cause, ), endTime = endTime, ), @@ -548,7 +567,7 @@ data class SMMetadata( when (workflowType) { WorkflowType.CREATION -> { metadata.copy( - creation = getUpdatedWorkflowMetadata(metadata.creation), + creation = metadata.creation?.let { getUpdatedWorkflowMetadata(it) }, ) } WorkflowType.DELETION -> { @@ -573,7 +592,7 @@ data class SMMetadata( fun getStartedSnapshots(): List? = when (workflowType) { WorkflowType.CREATION -> { - metadata.creation.started + metadata.creation?.started } WorkflowType.DELETION -> { metadata.deletion?.started @@ -586,7 +605,7 @@ data class SMMetadata( metadata = metadata.copy( creation = - metadata.creation.copy( + metadata.creation?.copy( retry = Retry(count = count), ), ) @@ -608,11 +627,11 @@ data class SMMetadata( fun resetRetry(): Builder { when (workflowType) { WorkflowType.CREATION -> { - if (metadata.creation.retry != null) { + if (metadata.creation?.retry != null) { metadata = metadata.copy( creation = - metadata.creation.copy( + metadata.creation?.copy( retry = null, ), ) @@ -643,16 +662,16 @@ data class SMMetadata( } fun setNextCreationTime(time: Instant): Builder { - metadata = - metadata.copy( - creation = - metadata.creation.copy( - trigger = - metadata.creation.trigger.copy( - time = time, - ), - ), - ) + metadata = metadata.copy( + creation = metadata.creation?.let { creation -> + creation.copy( + trigger = creation.trigger.copy(time = time), + ) + } ?: WorkflowMetadata( + SMState.CREATION_START, + Trigger(time = time), + ), + ) return this } @@ -660,7 +679,7 @@ data class SMMetadata( metadata = metadata.copy( creation = - metadata.creation.copy( + metadata.creation?.copy( started = if (snapshot == null) null else listOf(snapshot), ), ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index d52e90373..3c7e6de49 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.model +import org.opensearch.Version import org.opensearch.common.unit.TimeValue import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput @@ -41,7 +42,7 @@ data class SMPolicy( val id: String, val description: String? = null, val schemaVersion: Long, - val creation: Creation, + val creation: Creation?, val deletion: Deletion?, val snapshotConfig: Map, val jobEnabled: Boolean, @@ -55,10 +56,13 @@ data class SMPolicy( ) : ScheduledJobParameter, Writeable { init { + require(creation != null || deletion != null) { + "Must provide either creation or deletion configuration." + } require(snapshotConfig["repository"] != null && snapshotConfig["repository"] != "") { "Must provide the repository in snapshot config." } - require(creation.schedule.getNextExecutionTime(now()) != null) { + require(creation == null || creation.schedule.getNextExecutionTime(now()) != null) { "Next execution time from the creation schedule is null, please provide a valid cron expression." } require(deletion == null || (deletion.schedule.getNextExecutionTime(now()) != null)) { @@ -90,7 +94,7 @@ data class SMPolicy( builder.field(NAME_FIELD, smDocIdToPolicyName(id)) // for searching policy by name .optionalField(DESCRIPTION_FIELD, description) .field(SCHEMA_VERSION_FIELD, schemaVersion) - .field(CREATION_FIELD, creation) + .optionalField(CREATION_FIELD, creation) .optionalField(DELETION_FIELD, deletion) .field(SNAPSHOT_CONFIG_FIELD, snapshotConfig) .field(SCHEDULE_FIELD, jobSchedule) @@ -180,9 +184,11 @@ data class SMPolicy( schedule = IntervalSchedule(now(), 1, ChronoUnit.MINUTES) } - require(creation != null) { "Must provide the creation configuration." } - // If user doesn't provide delete schedule, use the creation schedule + // If user doesn't provide delete schedule, use the creation schedule if available if (deletion != null && !deletion.scheduleProvided) { + if (creation == null) { + throw IllegalArgumentException("Schedule not provided for neither deletion policy nor creation policy") + } deletion = deletion.copy( schedule = creation.schedule, @@ -219,7 +225,12 @@ data class SMPolicy( constructor(sin: StreamInput) : this( description = sin.readOptionalString(), schemaVersion = sin.readLong(), - creation = Creation(sin), + creation = if (sin.version.onOrAfter(Version.V_3_3_0)) { + sin.readOptionalWriteable { Creation(it) } + } else { + // Before V_3_3_0, creation was always required + Creation(sin) + }, deletion = sin.readOptionalWriteable { Deletion(it) }, snapshotConfig = sin.readMap() as Map, jobLastUpdateTime = sin.readInstant(), @@ -236,7 +247,12 @@ data class SMPolicy( override fun writeTo(out: StreamOutput) { out.writeOptionalString(description) out.writeLong(schemaVersion) - creation.writeTo(out) + if (out.version.onOrAfter(Version.V_3_3_0)) { + out.writeOptionalWriteable(creation) + } else { + // Before V_3_3_0, creation was always required + creation?.writeTo(out) ?: error("Creation cannot be null for versions before V_3_3_0") + } out.writeOptionalWriteable(deletion) out.writeMap(snapshotConfig) out.writeInstant(jobLastUpdateTime) @@ -301,22 +317,26 @@ data class SMPolicy( val scheduleProvided: Boolean = true, val condition: DeleteCondition, val timeLimit: TimeValue? = null, + val snapshotPattern: String? = null, ) : Writeable, ToXContent { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder = builder.startObject() .field(SCHEDULE_FIELD, schedule) .field(CONDITION_FIELD, condition) .optionalField(TIME_LIMIT_FIELD, timeLimit) + .optionalField(SNAPSHOT_PATTERN_FIELD, snapshotPattern) .endObject() companion object { const val SCHEDULE_FIELD = "schedule" const val CONDITION_FIELD = "condition" + const val SNAPSHOT_PATTERN_FIELD = "snapshot_pattern" fun parse(xcp: XContentParser): Deletion { var schedule: Schedule? = null var timeLimit: TimeValue? = null var condition: DeleteCondition? = null + var snapshotPattern: String? = null var scheduleProvided = true ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -328,6 +348,7 @@ data class SMPolicy( SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp) TIME_LIMIT_FIELD -> timeLimit = TimeValue.parseTimeValue(xcp.text(), TIME_LIMIT_FIELD) CONDITION_FIELD -> condition = DeleteCondition.parse(xcp) + SNAPSHOT_PATTERN_FIELD -> snapshotPattern = xcp.text() } } @@ -342,6 +363,7 @@ data class SMPolicy( scheduleProvided = scheduleProvided, timeLimit = timeLimit, condition = requireNotNull(condition) { "$CONDITION_FIELD must not be null." }, + snapshotPattern = snapshotPattern, ) } } @@ -350,12 +372,16 @@ data class SMPolicy( schedule = CronSchedule(sin), timeLimit = sin.readOptionalTimeValue(), condition = DeleteCondition(sin), + snapshotPattern = if (sin.version.onOrAfter(Version.V_3_3_0)) sin.readOptionalString() else null, ) override fun writeTo(out: StreamOutput) { schedule.writeTo(out) out.writeOptionalTimeValue(timeLimit) condition.writeTo(out) + if (out.version.onOrAfter(Version.V_3_3_0)) { + out.writeOptionalString(snapshotPattern) + } } } diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 845bfe34c..6250ba15d 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1497,6 +1497,9 @@ "type": "integer" } } + }, + "snapshot_pattern": { + "type": "keyword" } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/MocksTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/MocksTestCase.kt index eef228197..faba2fef9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/MocksTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/MocksTestCase.kt @@ -22,6 +22,7 @@ import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.client.AdminClient import org.opensearch.transport.client.Client import org.opensearch.transport.client.ClusterAdminClient +import java.util.concurrent.atomic.AtomicInteger abstract class MocksTestCase : OpenSearchTestCase() { val client: Client = mock() @@ -107,4 +108,22 @@ abstract class MocksTestCase : OpenSearchTestCase() { } }.whenever(clusterAdminClient).getSnapshots(any(), any()) } + + fun mockGetSnapshotsCallFirstSuccessThenFailure( + firstResponse: ActionResponse, + secondException: Exception, + ) { + whenever(client.admin()).thenReturn(adminClient) + whenever(adminClient.cluster()).thenReturn(clusterAdminClient) + val callCount = AtomicInteger(0) + doAnswer { + val listener = it.getArgument>(1) + if (callCount.getAndIncrement() == 0) { + listener.onResponse(firstResponse) + } else { + listener.onFailure(secondException) + } + null + }.whenever(clusterAdminClient).getSnapshots(any(), any()) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/SMBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/SMBackwardsCompatibilityIT.kt new file mode 100644 index 000000000..141c57764 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/SMBackwardsCompatibilityIT.kt @@ -0,0 +1,221 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.bwc + +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementRestTestCase +import java.util.Locale + +class SMBackwardsCompatibilityIT : SnapshotManagementRestTestCase() { + private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) + + private enum class ClusterType { + OLD, + MIXED, + UPGRADED, + ; + + companion object { + fun parse(value: String): ClusterType = when (value) { + "old_cluster" -> OLD + "mixed_cluster" -> MIXED + "upgraded_cluster" -> UPGRADED + else -> throw AssertionError("Unknown cluster type: $value") + } + } + } + + private fun getPluginUri(): String = when (CLUSTER_TYPE) { + ClusterType.OLD -> "_nodes/$CLUSTER_NAME-0/plugins" + ClusterType.MIXED -> { + when (System.getProperty("tests.rest.bwcsuite_round")) { + "second" -> "_nodes/$CLUSTER_NAME-1/plugins" + "third" -> "_nodes/$CLUSTER_NAME-2/plugins" + else -> "_nodes/$CLUSTER_NAME-0/plugins" + } + } + ClusterType.UPGRADED -> "_nodes/plugins" + } + + companion object { + private val CLUSTER_TYPE = ClusterType.parse(System.getProperty("tests.rest.bwcsuite")) + private val CLUSTER_NAME = System.getProperty("tests.clustername") + } + + override fun preserveIndicesUponCompletion(): Boolean = true + + override fun preserveReposUponCompletion(): Boolean = true + + override fun preserveTemplatesUponCompletion(): Boolean = true + + override fun restClientSettings(): Settings = Settings.builder() + .put(super.restClientSettings()) + // increase the timeout here to 90 seconds to handle long waits for a green + // cluster health. the waits for green need to be longer than a minute to + // account for delayed shards + .put(CLIENT_SOCKET_TIMEOUT, "90s") + .build() + + @Throws(Exception::class) + @Suppress("UNCHECKED_CAST") + fun `test snapshot management backwards compatibility`() { + val traditionalPolicyName = "${testIndexName}_traditional" + val deletionOnlyPolicyName = "${testIndexName}_deletion_only" + val patternPolicyName = "${testIndexName}_pattern" + + val uri = getPluginUri() + val responseMap = getAsMap(uri)["nodes"] as Map> + for (response in responseMap.values) { + val plugins = response["plugins"] as List> + val pluginNames = plugins.map { plugin -> plugin["name"] }.toSet() + when (CLUSTER_TYPE) { + ClusterType.OLD -> { + assertTrue(pluginNames.contains("opendistro-index-management") || pluginNames.contains("opensearch-index-management")) + + createRepository("test-repo") + + // Create traditional policy with both creation and deletion (pre-3.2.0 format) + createTraditionalPolicy(traditionalPolicyName) + + // Verify traditional policy works + val traditionalPolicy = getSMPolicy(traditionalPolicyName) + assertNotNull("Traditional policy creation should exist", traditionalPolicy.creation) + assertNotNull("Traditional policy deletion should exist", traditionalPolicy.deletion) + } + ClusterType.MIXED -> { + assertTrue(pluginNames.contains("opensearch-index-management")) + + // Verify traditional policy still works during rolling upgrade + val traditionalPolicy = getSMPolicy(traditionalPolicyName) + assertNotNull("Traditional policy creation should exist", traditionalPolicy.creation) + assertNotNull("Traditional policy deletion should exist", traditionalPolicy.deletion) + } + ClusterType.UPGRADED -> { + assertTrue(pluginNames.contains("opensearch-index-management")) + + // Verify traditional policy still works after full upgrade + val traditionalPolicy = getSMPolicy(traditionalPolicyName) + assertNotNull("Traditional policy creation should exist", traditionalPolicy.creation) + assertNotNull("Traditional policy deletion should exist", traditionalPolicy.deletion) + + // Now test new features on upgraded cluster + + // Create deletion-only policy (3.2.0+ feature) + createDeletionOnlyPolicy(deletionOnlyPolicyName) + val deletionOnlyPolicy = getSMPolicy(deletionOnlyPolicyName) + assertNull("Deletion-only policy creation should be null", deletionOnlyPolicy.creation) + assertNotNull("Deletion-only policy deletion should exist", deletionOnlyPolicy.deletion) + + // Create policy with snapshot pattern (3.2.0+ feature) + createPatternPolicy(patternPolicyName) + val patternPolicy = getSMPolicy(patternPolicyName) + assertNotNull("Pattern policy creation should exist", patternPolicy.creation) + assertNotNull("Pattern policy deletion should exist", patternPolicy.deletion) + assertEquals("Pattern policy should have snapshot pattern", "external-backup-*", patternPolicy.deletion?.snapshotPattern) + } + } + break + } + } + + private fun createTraditionalPolicy(policyName: String) { + val policy = """ + { + "sm_policy": { + "name": "$policyName", + "description": "Traditional SM policy with creation and deletion", + "creation": { + "schedule": { + "cron": { + "expression": "0 1 * * *", + "timezone": "UTC" + } + } + }, + "deletion": { + "schedule": { + "cron": { + "expression": "0 2 * * *", + "timezone": "UTC" + } + }, + "condition": { + "max_age": "7d", + "min_count": 1 + } + }, + "snapshot_config": { + "repository": "test-repo" + } + } + } + """.trimIndent() + createSMPolicyJson(policy, policyName) + } + + private fun createDeletionOnlyPolicy(policyName: String) { + val policy = """ + { + "sm_policy": { + "name": "$policyName", + "description": "Deletion-only SM policy (3.2.0+ feature)", + "deletion": { + "schedule": { + "cron": { + "expression": "0 2 * * *", + "timezone": "UTC" + } + }, + "condition": { + "max_age": "14d", + "min_count": 2 + } + }, + "snapshot_config": { + "repository": "test-repo" + } + } + } + """.trimIndent() + createSMPolicyJson(policy, policyName) + } + + private fun createPatternPolicy(policyName: String) { + val policy = """ + { + "sm_policy": { + "name": "$policyName", + "description": "SM policy with snapshot pattern (3.2.0+ feature)", + "creation": { + "schedule": { + "cron": { + "expression": "0 1 * * *", + "timezone": "UTC" + } + } + }, + "deletion": { + "schedule": { + "cron": { + "expression": "0 2 * * *", + "timezone": "UTC" + } + }, + "condition": { + "max_age": "30d", + "min_count": 3 + }, + "snapshot_pattern": "external-backup-*" + }, + "snapshot_config": { + "repository": "test-repo" + } + } + } + """.trimIndent() + createSMPolicyJson(policy, policyName) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerTests.kt new file mode 100644 index 000000000..8d57426f8 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerTests.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement + +import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant.now + +class SMRunnerTests : OpenSearchTestCase() { + fun `test sm runner with deletion only policy should execute deletion workflow`() { + val job = randomSMPolicy( + creationNull = true, // Deletion-only policy + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + ) + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + + // Verify the setup matches the condition + assertNull("Creation should be null for deletion-only policy", job.creation) + assertNotNull("Deletion should exist for deletion-only policy", job.deletion) + assertNull("Metadata creation should be null", metadata.creation) + assertNotNull("Metadata deletion should exist", metadata.deletion) + + assertTrue( + "Should have deletion-only policy setup", + metadata.creation == null && metadata.deletion != null, + ) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt index d4078288e..b12505796 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt @@ -38,7 +38,7 @@ import java.time.Instant.now abstract class SnapshotManagementRestTestCase : IndexManagementRestTestCase() { @After fun clearIndicesAfterEachTest() { - wipeAllIndices() + wipeAllIndices(skip = isBWCTest) } var timeout: Instant = Instant.ofEpochSecond(20) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt index 41fd90f21..fc6ead27f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt @@ -107,6 +107,8 @@ fun randomSMPolicy( deletionMaxAge: TimeValue? = null, deletionMinCount: Int = randomIntBetween(1, 5), deletionNull: Boolean = false, + creationNull: Boolean = false, + snapshotPattern: String? = null, snapshotConfig: MutableMap = mutableMapOf( "repository" to "repo", @@ -126,11 +128,14 @@ fun randomSMPolicy( schemaVersion = schemaVersion, jobEnabled = jobEnabled, jobLastUpdateTime = jobLastUpdateTime, - creation = - SMPolicy.Creation( - schedule = creationSchedule, - timeLimit = creationTimeLimit, - ), + creation = if (creationNull) { + null + } else { + SMPolicy.Creation( + schedule = creationSchedule, + timeLimit = creationTimeLimit, + ) + }, deletion = randomPolicyDeletion( deletionSchedule, @@ -139,6 +144,7 @@ fun randomSMPolicy( deletionMaxAge, deletionMinCount, deletionNull, + snapshotPattern, ), snapshotConfig = snapshotConfig, jobEnabledTime = if (jobEnabled) jobEnabledTime else null, @@ -156,6 +162,7 @@ fun randomPolicyDeletion( deletionMaxAge: TimeValue? = null, deletionMinCount: Int = randomIntBetween(1, 5), deletionNull: Boolean = false, + snapshotPattern: String? = null, ): SMPolicy.Deletion? { if (deletionNull) return null return SMPolicy.Deletion( @@ -167,6 +174,7 @@ fun randomPolicyDeletion( maxAge = deletionMaxAge, minCount = deletionMinCount, ), + snapshotPattern = snapshotPattern, ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt index 7efa82bb8..ed78311e0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt @@ -52,7 +52,7 @@ open class SMStateMachineTests : MocksTestCase() { stateMachineSpy.currentState(currentState).next(creationTransitions) argumentCaptor().apply { verify(stateMachineSpy).updateMetadata(capture()) - assertEquals(nextStates!!.first(), firstValue.creation.currentState) + assertEquals(nextStates!!.first(), firstValue.creation?.currentState) } } @@ -96,9 +96,9 @@ open class SMStateMachineTests : MocksTestCase() { stateMachineSpy.currentState(currentState).next(creationTransitions) argumentCaptor().apply { verify(stateMachineSpy).updateMetadata(capture()) - assertEquals(currentState, firstValue.creation.currentState) - assertNull(firstValue.creation.started) - assertEquals(3, firstValue.creation.retry!!.count) + assertEquals(currentState, firstValue.creation?.currentState) + assertNull(firstValue.creation?.started) + assertEquals(3, firstValue.creation?.retry!!.count) } } @@ -302,4 +302,79 @@ open class SMStateMachineTests : MocksTestCase() { // Verify exception type and cause assertTrue(thrownException.cause is OpenSearchException) } + + fun `test state machine with deletion-only policy`() = runBlocking { + // Create deletion-only metadata manually + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 2, + ) + val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + // Test that deletion-only policy properly initializes + assertNull("Creation should be null for deletion-only policy", job.creation) + assertNotNull("Deletion should exist", job.deletion) + assertNull("Metadata creation should be null", metadata.creation) + assertNotNull("Metadata deletion should exist", metadata.deletion) + + // Test that metadata is properly structured for deletion-only workflow + assertEquals("Deletion state should be DELETION_START", SMState.DELETION_START, metadata.deletion?.currentState) + } + + fun `test state machine with creation and deletion workflows`() = runBlocking { + val metadata = randomSMMetadata( + creationCurrentState = SMState.CREATION_START, + deletionCurrentState = SMState.DELETION_START, + ) + val job = randomSMPolicy( + creationNull = false, + deletionMaxAge = TimeValue.timeValueDays(30), + deletionMinCount = 5, + ) + val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + // Test that both workflows are properly initialized + assertNotNull("Creation should exist for policy with both workflows", job.creation) + assertNotNull("Deletion should exist for policy with both workflows", job.deletion) + assertNotNull("Metadata creation should exist", metadata.creation) + assertNotNull("Metadata deletion should exist", metadata.deletion) + + assertEquals("Creation state should be CREATION_START", SMState.CREATION_START, metadata.creation?.currentState) + assertEquals("Deletion state should be DELETION_START", SMState.DELETION_START, metadata.deletion?.currentState) + } + + fun `test state machine with snapshot pattern in deletion`() = runBlocking { + // Create deletion-only metadata manually + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 1, + snapshotPattern = "external-*", + ) + val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + // Test that snapshot pattern is properly configured + assertEquals("Snapshot pattern should be set", "external-*", job.deletion?.snapshotPattern) + assertNull("Creation should be null for deletion-only policy with pattern", job.creation) + assertNotNull("Deletion should exist", job.deletion) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingStateTests.kt index 59aaab726..00f40e64f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingStateTests.kt @@ -35,8 +35,8 @@ class CreatingStateTests : MocksTestCase() { val result = SMState.CREATING.instance.execute(context) assertTrue("Execution result should be Next.", result is SMResult.Next) result as SMResult.Next - assertNotNull("Creation started field is initialized.", result.metadataToSave.build().creation.started) - assertEquals("Latest execution status is in_progress", SMMetadata.LatestExecution.Status.IN_PROGRESS, result.metadataToSave.build().creation.latestExecution!!.status) + assertNotNull("Creation started field is initialized.", result.metadataToSave.build().creation?.started) + assertEquals("Latest execution status is in_progress", SMMetadata.LatestExecution.Status.IN_PROGRESS, result.metadataToSave.build().creation?.latestExecution!!.status) } fun `test create snapshot exception`() = @@ -55,9 +55,9 @@ class CreatingStateTests : MocksTestCase() { val result = SMState.CREATING.instance.execute(context) assertTrue("Execution result should be Failure.", result is SMResult.Fail) result as SMResult.Fail - assertNull("Creation started field should not be initialized.", result.metadataToSave.build().creation.started) - assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, result.metadataToSave.build().creation.latestExecution!!.status) - assertNotNull("Latest execution info should not be null", result.metadataToSave.build().creation.latestExecution!!.info) + assertNull("Creation started field should not be initialized.", result.metadataToSave.build().creation?.started) + assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, result.metadataToSave.build().creation?.latestExecution!!.status) + assertNotNull("Latest execution info should not be null", result.metadataToSave.build().creation?.latestExecution!!.info) } fun `test snapshot already created in previous schedule`() = @@ -77,8 +77,8 @@ class CreatingStateTests : MocksTestCase() { val result = SMState.CREATING.instance.execute(context) assertTrue("Execution result should be Next.", result is SMResult.Next) result as SMResult.Next - assertEquals("Started create snapshot name is $snapshotName.", snapshotName, result.metadataToSave.build().creation.started!!.first()) - assertEquals("Latest execution status is in_progress", SMMetadata.LatestExecution.Status.IN_PROGRESS, result.metadataToSave.build().creation.latestExecution!!.status) + assertEquals("Started create snapshot name is $snapshotName.", snapshotName, result.metadataToSave.build().creation?.started!!.first()) + assertEquals("Latest execution status is in_progress", SMMetadata.LatestExecution.Status.IN_PROGRESS, result.metadataToSave.build().creation?.latestExecution!!.status) } fun `test snapshot already created but not in previous schedule`() = @@ -99,7 +99,7 @@ class CreatingStateTests : MocksTestCase() { val result = SMState.CREATING.instance.execute(context) assertTrue("Execution result should be Next.", result is SMResult.Next) result as SMResult.Next - assertNotEquals("Started create snapshot name should not be $snapshotName.", snapshotName, result.metadataToSave.build().creation.started!!.first()) + assertNotEquals("Started create snapshot name should not be $snapshotName.", snapshotName, result.metadataToSave.build().creation?.started!!.first()) } fun `test get snapshots exception while checking if snapshot already created`() = @@ -117,8 +117,8 @@ class CreatingStateTests : MocksTestCase() { val result = SMState.CREATING.instance.execute(context) assertTrue("Execution result should be Failure.", result is SMResult.Fail) result as SMResult.Fail - assertNull("Creation started field should not be initialized.", result.metadataToSave.build().creation.started) - assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, result.metadataToSave.build().creation.latestExecution!!.status) - assertNotNull("Latest execution info should not be null", result.metadataToSave.build().creation.latestExecution!!.info) + assertNull("Creation started field should not be initialized.", result.metadataToSave.build().creation?.started) + assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, result.metadataToSave.build().creation?.latestExecution!!.status) + assertNotNull("Latest execution info should not be null", result.metadataToSave.build().creation?.latestExecution!!.info) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetStateTests.kt index 64ca24639..233c4c246 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetStateTests.kt @@ -6,10 +6,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine.states.creation import kotlinx.coroutines.runBlocking +import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.MocksTestCase import org.opensearch.indexmanagement.snapshotmanagement.engine.SMStateMachine import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMResult import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy import java.time.Instant.now @@ -28,7 +30,7 @@ class CreationConditionMetStateTests : MocksTestCase() { val result = SMState.CREATION_CONDITION_MET.instance.execute(context) assertTrue("Execution result should be Next.", result is SMResult.Next) result as SMResult.Next - assertNotEquals("Next execution time should be updated.", metadata.creation.trigger.time, result.metadataToSave.build().creation.trigger.time) + assertNotEquals("Next execution time should be updated.", metadata.creation?.trigger?.time, result.metadataToSave.build().creation?.trigger?.time) } fun `test next creation time has not met`() = @@ -46,4 +48,101 @@ class CreationConditionMetStateTests : MocksTestCase() { result as SMResult.Stay assertEquals("Next execution time should not be updated.", metadata, result.metadataToSave.build()) } + + fun `test creation condition with deletion-only policy should not execute`() = + runBlocking { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + creationNull = true, // Deletion-only policy + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + // Creation condition state should not be reached for deletion-only policies + // This test ensures the state machine doesn't try to execute creation states + // when the policy has no creation workflow + + assertNull("Creation should be null for deletion-only policy", job.creation) + assertNotNull("Deletion should exist for deletion-only policy", job.deletion) + assertNull("Metadata creation should be null", metadata.creation) + assertNotNull("Metadata deletion should exist", metadata.deletion) + } + + fun `test creation condition met with null job creation should fail`() = + runBlocking { + val metadata = randomSMMetadata( + creationCurrentState = SMState.CREATION_START, + nextCreationTime = now().minusSeconds(60), + ) + val job = randomSMPolicy(creationNull = true) // Policy with null creation + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.CREATION_CONDITION_MET.instance.execute(context) + assertTrue("Execution result should be Fail when job.creation is null", result is SMResult.Fail) + result as SMResult.Fail + assertEquals("Should fail with CREATION workflow type", org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType.CREATION, result.workflowType) + } + + fun `test creation condition met with null metadata creation should initialize creation time`() = + runBlocking { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No existing creation metadata + deletion = null, + ) + val job = randomSMPolicy(creationNull = false) // Policy with creation config + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.CREATION_CONDITION_MET.instance.execute(context) + // The result could be Next or Stay depending on whether the current time has passed the next execution time + assertTrue( + "Execution result should be Next or Stay when initializing creation time", + result is SMResult.Next || result is SMResult.Stay, + ) + + val builtMetadata = when (result) { + is SMResult.Next -> result.metadataToSave.build() + is SMResult.Stay -> result.metadataToSave.build() + else -> { + fail("Unexpected result type") + error("Should not reach here") + } + } + + // Verify that creation metadata is initialized + assertNotNull("Creation metadata should be initialized", builtMetadata.creation) + assertNotNull("Creation trigger time should be set", builtMetadata.creation?.trigger?.time) + } + + fun `test creating state with null job creation should fail and reset creation`() = + runBlocking { + val job = randomSMPolicy(creationNull = true) // Policy with null creation + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState.CREATION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + ), + deletion = null, + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = CreatingState.execute(context) + + assertTrue("Result should be Fail", result is SMResult.Fail) + val failResult = result as SMResult.Fail + assertEquals("Workflow type should be CREATION", org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType.CREATION, failResult.workflowType) + assertTrue("Force reset should be true", failResult.forceReset!!) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt index 0db5accff..aafbbf96a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt @@ -43,10 +43,10 @@ class CreationFinishedStateTests : MocksTestCase() { assertTrue("Execution results should be Next.", result is SMResult.Next) result as SMResult.Next val metadataToSave = result.metadataToSave.build() - assertNull("Started creation should be reset to null.", metadataToSave.creation.started) - assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.creation.latestExecution!!.status) - assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation.latestExecution!!.endTime) - assertNotNull("Latest execution status message should not be null", metadataToSave.creation.latestExecution!!.info!!.message) + assertNull("Started creation should be reset to null.", metadataToSave.creation?.started) + assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.creation?.latestExecution!!.status) + assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation?.latestExecution!!.endTime) + assertNotNull("Latest execution status message should not be null", metadataToSave.creation?.latestExecution!!.info!!.message) } fun `test creation still in progress`() = @@ -71,7 +71,7 @@ class CreationFinishedStateTests : MocksTestCase() { assertTrue("Execution results should be Stay.", result is SMResult.Stay) result as SMResult.Stay val metadataToSave = result.metadataToSave.build() - assertEquals("Started creation should not be reset.", snapshotName, metadataToSave.creation.started!!.first()) + assertEquals("Started creation should not be reset.", snapshotName, metadataToSave.creation?.started!!.first()) } fun `test creation end not successful`() = @@ -96,10 +96,10 @@ class CreationFinishedStateTests : MocksTestCase() { assertTrue("Execution results should be Fail.", result is SMResult.Fail) result as SMResult.Fail val metadataToSave = result.metadataToSave.build() - assertNull("Started creation should be reset to null.", metadataToSave.creation.started) - assertEquals("Latest execution status is failed", SMMetadata.LatestExecution.Status.FAILED, metadataToSave.creation.latestExecution!!.status) - assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation.latestExecution!!.endTime) - assertNotNull("Latest execution status cause should not be null", metadataToSave.creation.latestExecution!!.info!!.cause) + assertNull("Started creation should be reset to null.", metadataToSave.creation?.started) + assertEquals("Latest execution status is failed", SMMetadata.LatestExecution.Status.FAILED, metadataToSave.creation?.latestExecution!!.status) + assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation?.latestExecution!!.endTime) + assertNotNull("Latest execution status cause should not be null", metadataToSave.creation?.latestExecution!!.info!!.cause) } fun `test get snapshots exception in creation`() = @@ -123,9 +123,9 @@ class CreationFinishedStateTests : MocksTestCase() { assertTrue("Execution results should be Failure.", result is SMResult.Fail) result as SMResult.Fail val metadataToSave = result.metadataToSave.build() - assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, metadataToSave.creation.latestExecution!!.status) - assertNull("Latest execution status end_time should be null", metadataToSave.creation.latestExecution!!.endTime) - assertNotNull("Latest execution status info should not be null", metadataToSave.creation.latestExecution!!.info) + assertEquals("Latest execution status is retrying", SMMetadata.LatestExecution.Status.RETRYING, metadataToSave.creation?.latestExecution!!.status) + assertNull("Latest execution status end_time should be null", metadataToSave.creation?.latestExecution!!.endTime) + assertNotNull("Latest execution status info should not be null", metadataToSave.creation?.latestExecution!!.info) } fun `test get snapshots empty in creation`() = @@ -149,9 +149,9 @@ class CreationFinishedStateTests : MocksTestCase() { assertTrue("Execution results should be Next.", result is SMResult.Next) result as SMResult.Next val metadataToSave = result.metadataToSave.build() - assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.creation.latestExecution!!.status) - assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation.latestExecution!!.endTime) - assertNotNull("Latest execution status message should not be null", metadataToSave.creation.latestExecution!!.info!!.message) + assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.creation?.latestExecution!!.status) + assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation?.latestExecution!!.endTime) + assertNotNull("Latest execution status message should not be null", metadataToSave.creation?.latestExecution!!.info!!.message) } fun `test creation time limit exceed`() = @@ -181,8 +181,8 @@ class CreationFinishedStateTests : MocksTestCase() { result as SMResult.Fail val metadataToSave = result.metadataToSave.build() assertTrue(result.forceReset!!) - assertEquals("Latest execution status is time limit exceed", SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED, metadataToSave.creation.latestExecution!!.status) - assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation.latestExecution!!.endTime) - assertNotNull("Latest execution status cause should not be null", metadataToSave.creation.latestExecution!!.info!!.cause) + assertEquals("Latest execution status is time limit exceed", SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED, metadataToSave.creation?.latestExecution!!.status) + assertNotNull("Latest execution status end_time should not be null", metadataToSave.creation?.latestExecution!!.endTime) + assertNotNull("Latest execution status cause should not be null", metadataToSave.creation?.latestExecution!!.info!!.cause) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingStateTests.kt index cf7b977ab..97facba3d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingStateTests.kt @@ -63,7 +63,7 @@ class DeletingStateTests : MocksTestCase() { randomSMPolicy( policyName = "daily-snapshot", deletionMaxAge = TimeValue.timeValueMinutes(1), - deletionMinCount = 2, + deletionMinCount = 1, // to ensure at least one snapshot remains, as we have only two unique snapshot in mock response ) val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) @@ -194,4 +194,157 @@ class DeletingStateTests : MocksTestCase() { val metadataToSave = result.metadataToSave.build() assertNull("Deletion metadata should be null.", metadataToSave.deletion) } + + fun `test deletion with snapshot pattern combines policy and pattern snapshots`() = + runBlocking { + // Create policy snapshots and external pattern snapshots + val policySnapshot1 = mockSnapshotInfo(name = "daily-policy-snapshot-1", startTime = now().minusSeconds(8 * 24 * 3600).toEpochMilli(), policyName = "daily-policy") + val policySnapshot2 = mockSnapshotInfo(name = "daily-policy-snapshot-2", startTime = now().minusSeconds(2 * 24 * 3600).toEpochMilli(), policyName = "daily-policy") + val patternSnapshot1 = mockSnapshotInfo(name = "external-backup-1", startTime = now().minusSeconds(10 * 24 * 3600).toEpochMilli(), policyName = "some-other-policy") + val patternSnapshot2 = mockSnapshotInfo(name = "external-backup-2", startTime = now().minusSeconds(1 * 24 * 3600).toEpochMilli(), policyName = "some-other-policy") + + // Mock single call that returns all snapshots (both policy and pattern) + // The actual implementation makes 2 calls but combines results, so we mock with all snapshots + mockGetSnapshotsCall(response = mockGetSnapshotsResponse(listOf(policySnapshot1, policySnapshot2, patternSnapshot1, patternSnapshot2))) + mockDeleteSnapshotCall(response = AcknowledgedResponse(true)) + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_CONDITION_MET, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + policyName = "daily-policy", + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 2, + snapshotPattern = "external-*", + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETING.instance.execute(context) + assertTrue("Execution result should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + + // Should delete old snapshots from both policy and pattern that exceed max age + val deletedSnapshots = metadataToSave.deletion?.started + assertNotNull("Deletion started field should be initialized.", deletedSnapshots) + assertTrue("Should delete old policy snapshot", deletedSnapshots?.contains("daily-policy-snapshot-1") == true) + assertTrue("Should delete old pattern snapshot", deletedSnapshots?.contains("external-backup-1") == true) + } + + fun `test deletion without pattern only processes policy snapshots`() = + runBlocking { + val policySnapshot1 = mockSnapshotInfo(name = "policy-snapshot-1", startTime = now().minusSeconds(8 * 24 * 3600).toEpochMilli(), policyName = "test-policy") + val policySnapshot2 = mockSnapshotInfo(name = "policy-snapshot-2", startTime = now().minusSeconds(1 * 24 * 3600).toEpochMilli(), policyName = "test-policy") + + mockGetSnapshotsCall(response = mockGetSnapshotsResponse(listOf(policySnapshot1, policySnapshot2))) + mockDeleteSnapshotCall(response = AcknowledgedResponse(true)) + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_CONDITION_MET, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + policyName = "test-policy", + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 1, + snapshotPattern = null, // No pattern + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETING.instance.execute(context) + assertTrue("Execution result should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + + // Should only delete old policy snapshot + val deletedSnapshots = metadataToSave.deletion?.started + assertNotNull("Deletion started field should be initialized.", deletedSnapshots) + assertEquals("Should delete 1 snapshot", 1, deletedSnapshots?.size) + assertTrue("Should delete old policy snapshot", deletedSnapshots?.contains("policy-snapshot-1") == true) + } + + fun `test deletion with pattern respects conditions across combined snapshots`() = + runBlocking { + val policySnapshot = mockSnapshotInfo(name = "policy-snapshot-1", startTime = now().minusSeconds(8 * 24 * 3600).toEpochMilli(), policyName = "test-policy") + val patternSnapshot = mockSnapshotInfo(name = "external-backup-1", startTime = now().minusSeconds(10 * 24 * 3600).toEpochMilli(), policyName = "some-other-policy") + + // Mock single call that returns all snapshots (both policy and pattern) + mockGetSnapshotsCall(response = mockGetSnapshotsResponse(listOf(policySnapshot, patternSnapshot))) + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_CONDITION_MET, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + policyName = "test-policy", + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 2, // Require keeping 2 snapshots total + snapshotPattern = "external-*", + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETING.instance.execute(context) + assertTrue("Execution result should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + + // Should not delete any snapshots to maintain min count + assertNull("Should not delete snapshots due to min count constraint", metadataToSave.deletion?.started) + } + + fun `test deletion-only policy works without creation workflow`() = + runBlocking { + val oldSnapshot = mockSnapshotInfo(name = "old_snapshot", startTime = now().minusSeconds(8 * 24 * 3600).toEpochMilli(), policyName = "deletion-only-policy") + val newSnapshot = mockSnapshotInfo(name = "new_snapshot", startTime = now().toEpochMilli(), policyName = "deletion-only-policy") + mockGetSnapshotsCall(response = mockGetSnapshotsResponse(listOf(oldSnapshot, newSnapshot))) + mockDeleteSnapshotCall(response = AcknowledgedResponse(true)) + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_CONDITION_MET, + trigger = SMMetadata.Trigger(time = now()), + ), + ) + val job = randomSMPolicy( + policyName = "deletion-only-policy", + creationNull = true, // No creation + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 1, + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETING.instance.execute(context) + assertTrue("Execution result should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + + // Should work fine without creation workflow + assertNull("Creation workflow should be null", metadataToSave.creation) + assertNotNull("Deletion workflow should exist", metadataToSave.deletion) + val deletedSnapshots = metadataToSave.deletion?.started + assertNotNull("Should delete old snapshot", deletedSnapshots) + assertTrue("Should delete old snapshot", deletedSnapshots?.contains("old_snapshot") == true) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedStateTests.kt index c3334f7ba..a38f7ccaa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedStateTests.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletion import kotlinx.coroutines.runBlocking +import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.MocksTestCase import org.opensearch.indexmanagement.snapshotmanagement.engine.SMStateMachine @@ -18,6 +19,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.randomLatestExecution import org.opensearch.indexmanagement.snapshotmanagement.randomSMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy import java.time.Instant +import java.time.Instant.now class DeletionFinishedStateTests : MocksTestCase() { fun `test deletion succeed`() = @@ -127,4 +129,88 @@ class DeletionFinishedStateTests : MocksTestCase() { assertNotNull("Latest execution status end_time should not be null", metadataToSave.deletion!!.latestExecution!!.endTime) assertNotNull("Latest execution status cause should not be null", metadataToSave.deletion!!.latestExecution!!.info!!.cause) } + + fun `test deletion finished with pattern snapshots successful`() = + runBlocking { + val deletedSnapshot1 = "policy-snapshot-1" + val deletedSnapshot2 = "external-backup-1" + val snapshotNames = listOf(deletedSnapshot1, deletedSnapshot2) + + mockDeleteSnapshotCall(response = AcknowledgedResponse(true)) + // Mock the getSnapshots calls that DeletionFinishedState makes - return empty (snapshots deleted) + mockGetSnapshotsCall(response = mockGetSnapshotResponse(0)) // No snapshots found + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETING, + trigger = SMMetadata.Trigger(time = now()), + started = snapshotNames, + latestExecution = randomLatestExecution( + startTime = Instant.now(), + ), + ), + ) + val job = randomSMPolicy( + policyName = "test-policy", + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 1, + snapshotPattern = "external-*", + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETION_FINISHED.instance.execute(context) + assertTrue("Execution results should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + assertNull("Started deletion should be reset to null.", metadataToSave.deletion?.started) + assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.deletion?.latestExecution?.status) + assertNotNull("Latest execution status end_time should not be null", metadataToSave.deletion?.latestExecution?.endTime) + assertNotNull("Latest execution status message should not be null", metadataToSave.deletion?.latestExecution?.info?.message) + } + + fun `test deletion finished with no pattern snapshots found`() = + runBlocking { + // Test case where snapshots were started for deletion but are now gone (deletion finished) + val deletedSnapshotNames = listOf("nonexistent-pattern-snapshot") + + mockDeleteSnapshotCall(response = AcknowledgedResponse(true)) + // Mock the getSnapshots calls that DeletionFinishedState makes - return empty (snapshots deleted) + mockGetSnapshotsCall(response = mockGetSnapshotResponse(0)) // No snapshots found + + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, // No creation workflow + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETING, + trigger = SMMetadata.Trigger(time = now()), + started = deletedSnapshotNames, // Snapshots that were started for deletion + latestExecution = randomLatestExecution( + startTime = Instant.now(), + ), + ), + ) + val job = randomSMPolicy( + policyName = "test-policy", + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 1, + snapshotPattern = "nonexistent-*", // Pattern that matches nothing + ) + val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) + + val result = SMState.DELETION_FINISHED.instance.execute(context) + assertTrue("Execution results should be Next.", result is SMResult.Next) + result as SMResult.Next + val metadataToSave = result.metadataToSave.build() + + // Should complete successfully - snapshots that were started for deletion are now gone + assertNull("Started deletion should be reset to null after completion.", metadataToSave.deletion?.started) + assertEquals("Latest execution status is success", SMMetadata.LatestExecution.Status.SUCCESS, metadataToSave.deletion?.latestExecution?.status) + assertNotNull("Latest execution status message should indicate deletion completed", metadataToSave.deletion?.latestExecution?.info?.message) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicyTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicyTests.kt new file mode 100644 index 000000000..13de3108b --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicyTests.kt @@ -0,0 +1,345 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.toJsonString +import org.opensearch.test.OpenSearchTestCase + +class SMPolicyTests : OpenSearchTestCase() { + + fun `test policy with optional creation field`() { + val deletionOnlyPolicy = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + ) + + assertNull("Creation should be null for deletion-only policy", deletionOnlyPolicy.creation) + assertNotNull("Deletion should not be null", deletionOnlyPolicy.deletion) + } + + fun `test policy with optional snapshotPattern field`() { + val policyWithPattern = randomSMPolicy( + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + snapshotPattern = "backup-*", + ) + + assertEquals("Snapshot pattern should match", "backup-*", policyWithPattern.deletion?.snapshotPattern) + } + + fun `test policy serialization with optional creation`() { + val deletionOnlyPolicy = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + snapshotPattern = "pattern-*", + ) + + val out = BytesStreamOutput() + deletionOnlyPolicy.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val deserializedPolicy = SMPolicy(sin) + + assertNull("Deserialized creation should be null", deserializedPolicy.creation) + assertEquals("Snapshot pattern should be preserved", "pattern-*", deserializedPolicy.deletion?.snapshotPattern) + } + + fun `test policy parsing from JSON with optional fields`() { + val jsonWithOptionalFields = """ + { + "name": "test-policy", + "description": "Test policy with optional creation", + "deletion": { + "schedule": { + "cron": { + "expression": "0 0 * * *", + "timezone": "UTC" + } + }, + "condition": { + "max_age": "7d", + "min_count": 3 + }, + "snapshot_pattern": "external-*" + }, + "snapshot_config": { + "repository": "test-repo" + }, + "enabled": true + } + """.trimIndent() + + val policy = SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithOptionalFields), "test-policy-id") + + assertNull("Creation should be null when not specified", policy.creation) + assertNotNull("Deletion should not be null", policy.deletion) + assertEquals("Snapshot pattern should match", "external-*", policy.deletion?.snapshotPattern) + } + + fun `test policy toXContent with optional fields`() { + val policy = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + snapshotPattern = "backup-*", + ) + + val jsonString = policy.toJsonString() + + assertFalse("JSON should not contain creation field", jsonString.contains("\"creation\"")) + assertTrue("JSON should contain deletion field", jsonString.contains("\"deletion\"")) + assertTrue("JSON should contain snapshot_pattern", jsonString.contains("\"snapshot_pattern\"")) + assertTrue("JSON should contain backup-* pattern", jsonString.contains("\"backup-*\"")) + } + + fun `test policy validation requires either creation or deletion`() { + assertThrows("Policy with neither creation nor deletion should fail validation", IllegalArgumentException::class.java) { + SMPolicy( + id = "test-id", + description = "Invalid policy", + schemaVersion = 1L, + creation = null, + deletion = null, + snapshotConfig = mapOf("repository" to "test-repo"), + jobEnabled = true, + jobLastUpdateTime = randomInstant(), + jobEnabledTime = randomInstant(), + jobSchedule = randomCronSchedule(), + ) + } + } + + fun `test deletion-only policy round-trip serialization`() { + val originalPolicy = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(30), + deletionMinCount = 5, + deletionMaxCount = 100, + snapshotPattern = "daily-*", + ) + + // Test JSON round-trip + val jsonString = originalPolicy.toJsonString() + val parsedPolicy = SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonString), originalPolicy.id) + + assertNull("Parsed creation should be null", parsedPolicy.creation) + assertEquals("Deletion condition should match", originalPolicy.deletion?.condition, parsedPolicy.deletion?.condition) + assertEquals("Snapshot pattern should match", originalPolicy.deletion?.snapshotPattern, parsedPolicy.deletion?.snapshotPattern) + } + + private fun randomCronSchedule() = org.opensearch.jobscheduler.spi.schedule.CronSchedule("0 0 * * *", java.time.ZoneId.of("UTC")) + + private fun randomInstant() = java.time.Instant.now() + + fun `test policy validation with null repository should throw exception`() { + val jsonWithNullRepository = """ + { + "name": "test-policy", + "creation": { + "schedule": { + "cron": { + "expression": "0 0 * * *", + "timezone": "UTC" + } + } + }, + "snapshot_config": { + "repository": null + }, + "enabled": true + } + """.trimIndent() + + val exception = assertThrows(IllegalArgumentException::class.java) { + SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithNullRepository), "test-policy-id") + } + assertTrue("Exception should mention repository", exception.message?.contains("repository") == true) + } + + fun `test policy validation with empty repository should throw exception`() { + val jsonWithEmptyRepository = """ + { + "name": "test-policy", + "creation": { + "schedule": { + "cron": { + "expression": "0 0 * * *", + "timezone": "UTC" + } + } + }, + "snapshot_config": { + "repository": "" + }, + "enabled": true + } + """.trimIndent() + + val exception = assertThrows(IllegalArgumentException::class.java) { + SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithEmptyRepository), "test-policy-id") + } + assertTrue("Exception should mention repository", exception.message?.contains("repository") == true) + } + + fun `test policy validation with null creation schedule should throw exception`() { + val jsonWithNullSchedule = """ + { + "name": "test-policy", + "creation": { + "schedule": { + "cron": { + "expression": "invalid-cron", + "timezone": "UTC" + } + } + }, + "snapshot_config": { + "repository": "test-repo" + }, + "enabled": true + } + """.trimIndent() + + val exception = assertThrows(IllegalArgumentException::class.java) { + SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithNullSchedule), "test-policy-id") + } + assertTrue("Exception should mention cron expression", exception.message?.contains("Cron expression") == true) + } + + fun `test policy with deletion using creation schedule when deletion schedule not provided`() { + val jsonWithDeletionUsingCreationSchedule = """ + { + "name": "test-policy", + "creation": { + "schedule": { + "cron": { + "expression": "0 0 * * *", + "timezone": "UTC" + } + } + }, + "deletion": { + "condition": { + "max_age": "7d", + "min_count": 3 + } + }, + "snapshot_config": { + "repository": "test-repo" + }, + "enabled": true + } + """.trimIndent() + + val policy = SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithDeletionUsingCreationSchedule), "test-policy-id") + + assertNotNull("Creation should not be null", policy.creation) + assertNotNull("Deletion should not be null", policy.deletion) + assertEquals("Deletion should use creation schedule", policy.creation?.schedule, policy.deletion?.schedule) + } + + fun `test policy serialization with older version requires creation`() { + val policy = randomSMPolicy(creationNull = false) + + val out = BytesStreamOutput() + out.version = org.opensearch.Version.V_3_1_0 + policy.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + sin.version = org.opensearch.Version.V_3_1_0 + val deserializedPolicy = SMPolicy(sin) + + assertNotNull("Deserialized creation should not be null for older version", deserializedPolicy.creation) + } + + fun `test policy validation with null creation schedule when creation is null should pass`() { + val deletionOnlyPolicy = randomSMPolicy( + creationNull = true, + deletionMaxAge = TimeValue.timeValueDays(7), + deletionMinCount = 3, + ) + + // This should not throw an exception because creation is null + assertNull("Creation should be null", deletionOnlyPolicy.creation) + assertNotNull("Deletion should not be null", deletionOnlyPolicy.deletion) + } + + fun `test policy validation with null deletion schedule when deletion is null should pass`() { + val creationOnlyPolicy = randomSMPolicy( + creationNull = false, + deletionNull = true, + ) + + // This should not throw an exception because deletion is null + assertNotNull("Creation should not be null", creationOnlyPolicy.creation) + assertNull("Deletion should be null", creationOnlyPolicy.deletion) + } + + fun `test policy with deletion using creation schedule when both creation and deletion are provided but deletion schedule is missing`() { + val jsonWithBothCreationAndDeletionButDeletionScheduleMissing = """ + { + "name": "test-policy", + "creation": { + "schedule": { + "cron": { + "expression": "0 0 * * *", + "timezone": "UTC" + } + } + }, + "deletion": { + "condition": { + "max_age": "7d", + "min_count": 3 + } + }, + "snapshot_config": { + "repository": "test-repo" + }, + "enabled": true + } + """.trimIndent() + + val policy = SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithBothCreationAndDeletionButDeletionScheduleMissing), "test-policy-id") + + assertNotNull("Creation should not be null", policy.creation) + assertNotNull("Deletion should not be null", policy.deletion) + assertEquals("Deletion should use creation schedule when deletion schedule is not provided", policy.creation?.schedule, policy.deletion?.schedule) + } + + fun `test policy with deletion but no creation should throw exception when deletion schedule not provided`() { + val jsonWithDeletionOnlyNoSchedule = """ + { + "name": "test-policy", + "deletion": { + "condition": { + "max_age": "7d", + "min_count": 3 + } + }, + "snapshot_config": { + "repository": "test-repo" + }, + "enabled": true + } + """.trimIndent() + + val exception = assertThrows(IllegalArgumentException::class.java) { + SMPolicy.parse(createParser(XContentType.JSON.xContent(), jsonWithDeletionOnlyNoSchedule), "test-policy-id") + } + assertTrue( + "Exception should mention schedule not provided for neither deletion nor creation policy", + exception.message?.contains("Schedule not provided for neither deletion policy nor creation policy") == true, + ) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt index cf23eb5d0..0035612c6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt @@ -5,8 +5,11 @@ package org.opensearch.indexmanagement.snapshotmanagement.model +import org.opensearch.Version import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState +import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType import org.opensearch.indexmanagement.snapshotmanagement.randomNotificationConfig import org.opensearch.indexmanagement.snapshotmanagement.randomSMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy @@ -28,4 +31,240 @@ class WriteableTests : OpenSearchTestCase() { val streamedSMMetadata = SMMetadata(sin) assertEquals("Round tripping sm metadata stream doesn't work", smMetadata, streamedSMMetadata) } + + fun `test sm metadata as stream with older version requires creation`() { + // Ensure creation is non-null for older versions + val smMetadata = randomSMMetadata() + val out = BytesStreamOutput().also { + it.version = Version.V_3_1_0 + smMetadata.writeTo(it) + } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes).also { it.version = Version.V_3_1_0 } + val streamedSMMetadata = SMMetadata(sin) + assertEquals("Round tripping sm metadata (older version) doesn't work", smMetadata, streamedSMMetadata) + } + + fun `test sm metadata as stream with newer version allows nullable creation`() { + // Build metadata with nullable creation and non-null deletion + val smMetadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + ), + ) + val out = BytesStreamOutput().also { + it.version = Version.V_3_3_0 + smMetadata.writeTo(it) + } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes).also { it.version = Version.V_3_3_0 } + val streamedSMMetadata = SMMetadata(sin) + assertEquals("Round tripping sm metadata (newer version) doesn't work", smMetadata, streamedSMMetadata) + } + + fun `test sm metadata builder getStartedSnapshots with null creation`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = null, + ) + val builder = SMMetadata.Builder(metadata).workflow(WorkflowType.CREATION) + assertNull("getStartedSnapshots should return null when creation is null", builder.getStartedSnapshots()) + } + + fun `test sm metadata builder getStartedSnapshots with null deletion`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = null, + ) + val builder = SMMetadata.Builder(metadata).workflow(WorkflowType.DELETION) + assertNull("getStartedSnapshots should return null when deletion is null", builder.getStartedSnapshots()) + } + + fun `test sm metadata builder resetRetry with null creation retry`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = SMState.CREATION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + retry = null, + ), + deletion = null, + ) + val builder = SMMetadata.Builder(metadata).workflow(WorkflowType.CREATION) + val result = builder.resetRetry() + assertEquals("resetRetry should not change metadata when retry is null", metadata, result.build()) + } + + fun `test sm metadata builder resetRetry with null deletion retry`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = SMMetadata.WorkflowMetadata( + currentState = org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + retry = null, + ), + ) + val builder = SMMetadata.Builder(metadata).workflow(org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType.DELETION) + val result = builder.resetRetry() + assertEquals("resetRetry should not change metadata when retry is null", metadata, result.build()) + } + + fun `test sm metadata builder resetRetry with non-null creation retry`() { + val originalRetry = SMMetadata.Retry(count = 3) + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState.CREATION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + retry = originalRetry, + ), + deletion = null, + ) + val builder = SMMetadata.Builder(metadata).workflow(org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType.CREATION) + val result = builder.resetRetry() + val builtMetadata = result.build() + assertNull("resetRetry should set retry to null", builtMetadata.creation?.retry) + assertEquals("resetRetry should preserve other creation fields", metadata.creation?.currentState, builtMetadata.creation?.currentState) + assertEquals("resetRetry should preserve trigger", metadata.creation?.trigger, builtMetadata.creation?.trigger) + } + + fun `test sm metadata builder resetRetry with non-null deletion retry`() { + val originalRetry = SMMetadata.Retry(count = 2) + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = SMMetadata.WorkflowMetadata( + currentState = org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState.DELETION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + retry = originalRetry, + ), + ) + val builder = SMMetadata.Builder(metadata).workflow(org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType.DELETION) + val result = builder.resetRetry() + val builtMetadata = result.build() + assertNull("resetRetry should set retry to null", builtMetadata.deletion?.retry) + assertEquals("resetRetry should preserve other deletion fields", metadata.deletion?.currentState, builtMetadata.deletion?.currentState) + assertEquals("resetRetry should preserve trigger", metadata.deletion?.trigger, builtMetadata.deletion?.trigger) + } + + fun `test sm metadata builder setNextCreationTime with null creation`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = null, + ) + val builder = SMMetadata.Builder(metadata) + val newTime = java.time.Instant.now().plusSeconds(3600) + val result = builder.setNextCreationTime(newTime) + val builtMetadata = result.build() + assertNotNull("setNextCreationTime should create new creation when null", builtMetadata.creation) + assertEquals("setNextCreationTime should set correct time", newTime, builtMetadata.creation?.trigger?.time) + assertEquals("setNextCreationTime should set correct state", org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState.CREATION_START, builtMetadata.creation?.currentState) + } + + fun `test sm metadata builder resetWorkflow with creation workflow should reset creation state`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = SMState.CREATION_FINISHED, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + started = listOf("snapshot-1"), + retry = SMMetadata.Retry(count = 1), + ), + deletion = null, + ) + + val builtMetadata = SMMetadata.Builder(metadata) + .workflow(WorkflowType.CREATION) + .resetWorkflow() + .build() + + assertEquals("Creation state should be reset to CREATION_START", SMState.CREATION_START, builtMetadata.creation?.currentState) + assertNull("Creation started should be reset to null", builtMetadata.creation?.started) + assertNull("Creation retry should be reset to null", builtMetadata.creation?.retry) + } + + fun `test sm metadata builder resetWorkflow with deletion workflow should reset deletion state`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = null, + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_FINISHED, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + started = listOf("snapshot-1"), + retry = SMMetadata.Retry(count = 1), + ), + ) + + val builtMetadata = SMMetadata.Builder(metadata) + .workflow(WorkflowType.DELETION) + .resetWorkflow() + .build() + + assertEquals("Deletion state should be reset to DELETION_START", SMState.DELETION_START, builtMetadata.deletion?.currentState) + assertNull("Deletion started should be reset to null", builtMetadata.deletion?.started) + assertNull("Deletion retry should be reset to null", builtMetadata.deletion?.retry) + } + + fun `test sm metadata builder resetWorkflow with both workflows should reset both states`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = SMState.CREATION_FINISHED, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + started = listOf("snapshot-1"), + retry = SMMetadata.Retry(count = 1), + ), + deletion = SMMetadata.WorkflowMetadata( + currentState = SMState.DELETION_FINISHED, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + started = listOf("snapshot-2"), + retry = SMMetadata.Retry(count = 1), + ), + ) + + val builtMetadata = SMMetadata.Builder(metadata) + .workflow(WorkflowType.CREATION) + .resetWorkflow() + .build() + + assertEquals("Creation state should be reset to CREATION_START", SMState.CREATION_START, builtMetadata.creation?.currentState) + assertNull("Creation started should be reset to null", builtMetadata.creation?.started) + assertNull("Creation retry should be reset to null", builtMetadata.creation?.retry) + // Deletion should remain unchanged since we're resetting creation workflow + assertEquals("Deletion state should remain unchanged", SMState.DELETION_FINISHED, builtMetadata.deletion?.currentState) + assertNotNull("Deletion started should remain unchanged", builtMetadata.deletion?.started) + assertNotNull("Deletion retry should remain unchanged", builtMetadata.deletion?.retry) + } + + fun `test sm metadata builder resetCreation should set creation to null`() { + val metadata = SMMetadata( + policySeqNo = 1L, + policyPrimaryTerm = 1L, + creation = SMMetadata.WorkflowMetadata( + currentState = SMState.CREATION_START, + trigger = SMMetadata.Trigger(time = java.time.Instant.now()), + ), + deletion = null, + ) + val builder = SMMetadata.Builder(metadata) + val result = builder.resetCreation() + val builtMetadata = result.build() + assertNull("resetCreation should set creation to null", builtMetadata.creation) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt index ce64e240f..d9390c72a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt @@ -46,7 +46,7 @@ class RestExplainSnapshotManagementIT : SnapshotManagementRestTestCase() { assertTrue("Explain response did not contain policy creation details", explainPolicyMap.containsKey(SMMetadata.CREATION_FIELD)) val creationField = explainPolicyMap[SMMetadata.CREATION_FIELD] as Map val creationTriggerField = creationField[TRIGGER_FIELD] as Map - val expectedCreationTime = smPolicy.creation.schedule.getNextExecutionTime(now()).toEpochMilli() + val expectedCreationTime = smPolicy.creation?.schedule?.getNextExecutionTime(now())?.toEpochMilli() assertEquals("Policy creation trigger time didn't match", expectedCreationTime, creationTriggerField[SMMetadata.Trigger.TIME_FIELD]) val deletionField = explainPolicyMap[SMMetadata.DELETION_FIELD] as Map val deletionTriggerField = deletionField[TRIGGER_FIELD] as Map diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt index f40467f81..1a1800d51 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt @@ -147,4 +147,46 @@ class RestIndexSnapshotManagementIT : SnapshotManagementRestTestCase() { assertEquals("Mappings are different", expectedMap, mappingsMap) } + + @Suppress("UNCHECKED_CAST") + fun `test creating a deletion-only policy with snapshot pattern`() { + // Create a deletion-only policy with snapshot pattern support + var smPolicy = randomSMPolicy( + creationNull = true, // No creation workflow + deletionMaxAge = org.opensearch.common.unit.TimeValue.timeValueDays(30), + deletionMinCount = 3, + snapshotPattern = "external-backup-*", // Include external snapshots in deletion + ) + + val response = client().makeRequest("POST", "$SM_POLICIES_URI/${smPolicy.policyName}", emptyMap(), smPolicy.toHttpEntity()) + assertEquals("Create deletion-only SM policy failed", RestStatus.CREATED, response.restStatus()) + + val responseBody = response.asMap() + val createdId = responseBody["_id"] as String + assertNotEquals("Response is missing Id", NO_ID, createdId) + assertEquals("Not same id", smPolicy.id, createdId) + assertEquals("Incorrect Location header", "$SM_POLICIES_URI/${smPolicy.policyName}", response.getHeader("Location")) + + val responseSMPolicy = responseBody[SM_TYPE] as Map + // During indexing, we update these two fields so we need to override them here before the equality check + smPolicy = smPolicy.copy( + jobLastUpdateTime = Instant.ofEpochMilli(responseSMPolicy[SMPolicy.LAST_UPDATED_TIME_FIELD] as Long), + schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, + ) + + // Verify the policy structure + val createdPolicyMap = smPolicy.toMap(XCONTENT_WITHOUT_TYPE_AND_USER) + assertEquals("Created and returned deletion-only policies differ", createdPolicyMap, responseSMPolicy) + + // Verify creation is null and deletion has snapshot pattern + assertNull("Creation should be null for deletion-only policy", responseSMPolicy["creation"]) + assertNotNull("Deletion should exist for deletion-only policy", responseSMPolicy["deletion"]) + + val deletionConfig = responseSMPolicy["deletion"] as Map + assertEquals("Snapshot pattern should be preserved", "external-backup-*", deletionConfig["snapshot_pattern"]) + + val deletionCondition = deletionConfig["condition"] as Map + assertEquals("Max age should be 30 days", "30d", deletionCondition["max_age"]) + assertEquals("Min count should be 3", 3, deletionCondition["min_count"]) + } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 845bfe34c..6250ba15d 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1497,6 +1497,9 @@ "type": "integer" } } + }, + "snapshot_pattern": { + "type": "keyword" } } },