Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,20 @@ object SMRunner :

// creation, deletion workflow have to be executed sequentially,
// because they are sharing the same metadata document.
SMStateMachine(client, job, metadata, settings, threadPool, indicesManager)
val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager)
.handlePolicyChange()
.currentState(metadata.creation.currentState)
.next(creationTransitions)
.apply {
val deleteMetadata = metadata.deletion
if (deleteMetadata != null) {
this.currentState(deleteMetadata.currentState)
.next(deletionTransitions)
}
}

// Execute creation workflow if it exists
if (metadata.creation != null) {
stateMachine.currentState(metadata.creation.currentState)
.next(creationTransitions)
}

// Execute deletion workflow if it exists
if (metadata.deletion != null) {
stateMachine.currentState(metadata.deletion.currentState)
.next(deletionTransitions)
}
} finally {
if (!releaseLockForScheduledJob(context, lock)) {
log.error("Could not release lock [${lock.lockId}] for ${job.id}.")
Expand Down Expand Up @@ -155,13 +158,14 @@ object SMRunner :
id = smPolicyNameToMetadataDocId(smDocIdToPolicyName(job.id)),
policySeqNo = job.seqNo,
policyPrimaryTerm = job.primaryTerm,
creation =
SMMetadata.WorkflowMetadata(
SMState.CREATION_START,
SMMetadata.Trigger(
time = job.creation.schedule.getNextExecutionTime(now),
),
),
creation = job.creation?.let {
SMMetadata.WorkflowMetadata(
SMState.CREATION_START,
SMMetadata.Trigger(
time = job.creation.schedule.getNextExecutionTime(now),
),
)
},
deletion =
job.deletion?.let {
SMMetadata.WorkflowMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.common.regex.Regex
import org.opensearch.common.time.DateFormatter
import org.opensearch.common.time.DateFormatters
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -240,7 +241,7 @@ suspend fun Client.getSnapshots(
snapshotMissingMsg: String?,
exceptionMsg: String,
): GetSnapshotsResult {
val snapshots =
var snapshots =
try {
getSnapshots(
name,
Expand All @@ -257,7 +258,19 @@ suspend fun Client.getSnapshots(
cause = ex,
)
return GetSnapshotsResult(true, emptyList(), metadataBuilder)
}.filterBySMPolicyInSnapshotMetadata(job.policyName)
}

// Parse CSV patterns from name and implement pattern-based filtering
val patterns = name.split(",").map { it.trim() }
val policyNamePattern = "${job.policyName}*"
// Filter other snapshots by policy metadata
val otherPatternSnapshots = snapshots.filter { snapshot ->
patterns.any { pattern ->
pattern != policyNamePattern && Regex.simpleMatch(pattern, snapshot.snapshotId().name)
}
}
val policySnapshots = snapshots.filterBySMPolicyInSnapshotMetadata(job.policyName)
snapshots = policySnapshots + otherPatternSnapshots

return GetSnapshotsResult(false, snapshots, metadataBuilder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class SMStateMachine(
val retry =
when (result.workflowType) {
WorkflowType.CREATION -> {
metadata.creation.retry
metadata.creation?.retry
}
WorkflowType.DELETION -> {
metadata.deletion?.retry
Expand Down Expand Up @@ -249,7 +249,11 @@ class SMStateMachine(
val metadataToSave =
SMMetadata.Builder(metadata)
.setSeqNoPrimaryTerm(job.seqNo, job.primaryTerm)
.setNextCreationTime(job.creation.schedule.getNextExecutionTime(now))

val creation = job.creation
creation?.let {
metadataToSave.setNextCreationTime(creation.schedule.getNextExecutionTime(now))
}

val deletion = job.deletion
deletion?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ object CreatingState : State {
SMMetadata.Builder(metadata)
.workflow(WorkflowType.CREATION)

var snapshotName: String? = metadata.creation.started?.first()
if (job.creation == null) {
log.warn("Policy creation config becomes null before trying to create snapshot. Reset.")
return SMResult.Fail(
metadataBuilder.resetCreation(), WorkflowType.CREATION, true,
)
}

var snapshotName: String? = metadata.creation?.started?.first()

// Check if there's already a snapshot created by SM in current execution period.
// So that this State can be executed idempotent.
Expand All @@ -54,8 +61,8 @@ object CreatingState : State {
}
val getSnapshots = getSnapshotsResult.snapshots

val latestExecutionStartTime = job.creation.schedule.getPeriodStartingAt(null).v1()
snapshotName = checkCreatedSnapshots(latestExecutionStartTime, getSnapshots)
val latestExecutionStartTime = job.creation?.schedule?.getPeriodStartingAt(null)?.v1()
snapshotName = latestExecutionStartTime?.let { checkCreatedSnapshots(it, getSnapshots) }
if (snapshotName != null) {
log.info("Already created snapshot [$snapshotName] during this execution period starting at $latestExecutionStartTime.")
metadataBuilder.setLatestExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.tryUpdatingNextExecutionTime
import java.time.Instant.now

object CreationConditionMetState : State {
override val continuous = true

@Suppress("ReturnCount")
override suspend fun execute(context: SMStateMachine): SMResult {
val job = context.job
val metadata = context.metadata
Expand All @@ -24,15 +26,35 @@ object CreationConditionMetState : State {
SMMetadata.Builder(metadata)
.workflow(WorkflowType.CREATION)

val nextCreationTime = metadata.creation.trigger.time
val updateNextTimeResult =
tryUpdatingNextExecutionTime(
metadataBuilder, nextCreationTime, job.creation.schedule, WorkflowType.CREATION, log,
if (job.creation == null) {
log.warn("Policy creation config becomes null before trying to create snapshot. Reset.")
return SMResult.Fail(
metadataBuilder.resetCreation(), WorkflowType.CREATION, true,
)
if (!updateNextTimeResult.updated) {
return SMResult.Stay(metadataBuilder)
}
metadataBuilder = updateNextTimeResult.metadataBuilder

// if job.creation != null, then metadata.creation.trigger.time should already be
// initialized or handled in handlePolicyChange before executing this state.
val nextCreationTime = if (metadata.creation == null) {
val nextTime = job.creation.schedule.getNextExecutionTime(now())
nextTime?.let { metadataBuilder.setNextCreationTime(it) }
nextTime
} else {
metadata.creation.trigger.time
}

nextCreationTime?.let { creationTime ->
job.creation.schedule.let { schedule ->
val updateNextTimeResult =
tryUpdatingNextExecutionTime(
metadataBuilder, creationTime, schedule, WorkflowType.CREATION, log,
)
if (!updateNextTimeResult.updated) {
return SMResult.Stay(metadataBuilder)
}
metadataBuilder = updateNextTimeResult.metadataBuilder
}
}

return SMResult.Next(metadataBuilder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.time.Instant.now
object CreationFinishedState : State {
override val continuous = true

@Suppress("ReturnCount", "LongMethod", "NestedBlockDepth")
@Suppress("ReturnCount", "LongMethod", "NestedBlockDepth", "CyclomaticComplexMethod")
override suspend fun execute(context: SMStateMachine): SMResult {
val client = context.client
val job = context.job
Expand All @@ -32,8 +32,8 @@ object CreationFinishedState : State {
SMMetadata.Builder(metadata)
.workflow(WorkflowType.CREATION)

metadata.creation.started?.first()?.let { snapshotName ->
if (metadata.creation.latestExecution == null) {
metadata.creation?.started?.first()?.let { snapshotName ->
if (metadata.creation?.latestExecution == null) {
// This should not happen
log.error("latest_execution is null while checking if snapshot [$snapshotName] creation has finished. Reset.")
metadataBuilder.resetWorkflow()
Expand Down Expand Up @@ -75,8 +75,8 @@ object CreationFinishedState : State {
job.notificationConfig?.sendCreationNotification(client, job.policyName, creationMessage, job.user, log)
}
SnapshotState.IN_PROGRESS -> {
job.creation.timeLimit?.let { timeLimit ->
if (timeLimit.isExceed(metadata.creation.latestExecution.startTime)) {
job.creation?.timeLimit?.let { timeLimit ->
if (timeLimit.isExceed(metadata.creation?.latestExecution?.startTime)) {
return timeLimitExceeded(
timeLimit, metadataBuilder, WorkflowType.CREATION, log,
)
Expand All @@ -98,18 +98,22 @@ object CreationFinishedState : State {

// if now is after next creation time, update nextCreationTime to next execution schedule
// TODO may want to notify user that we skipped the execution because snapshot creation time is longer than execution schedule
val result =
tryUpdatingNextExecutionTime(
metadataBuilder, metadata.creation.trigger.time, job.creation.schedule,
WorkflowType.CREATION, log,
)
if (result.updated) {
metadataBuilder = result.metadataBuilder
metadata.creation?.trigger?.time?.let { triggerTime ->
job.creation?.schedule?.let { schedule ->
val result =
tryUpdatingNextExecutionTime(
metadataBuilder, triggerTime, schedule,
WorkflowType.CREATION, log,
)
if (result.updated) {
metadataBuilder = result.metadataBuilder
}
}
}
}

val metadataToSave = metadataBuilder.build()
if (metadataToSave.creation.started != null) {
if (metadataToSave.creation?.started != null) {
return SMResult.Stay(metadataBuilder)
}
return SMResult.Next(metadataBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,28 @@ object DeletingState : State {

val snapshotsToDelete: List<String>

var snapshotPattern = job.policyName + "*"

if (job.deletion?.snapshotPattern != null) {
snapshotPattern += "," + job.deletion.snapshotPattern
}
val getSnapshotsRes =
client.getSnapshots(
job, job.policyName + "*", metadataBuilder, log,
job, snapshotPattern, metadataBuilder, log,
getSnapshotsMissingMessage(),
getSnapshotsErrorMessage(),
)
metadataBuilder = getSnapshotsRes.metadataBuilder
if (getSnapshotsRes.failed) {
return SMResult.Fail(metadataBuilder, WorkflowType.DELETION)
}
val getSnapshots = getSnapshotsRes.snapshots
val snapshots = getSnapshotsRes.snapshots
.distinctBy { it.snapshotId().name }
.filter { it.state() != SnapshotState.IN_PROGRESS }

snapshotsToDelete =
filterByDeleteCondition(
getSnapshots.filter { it.state() != SnapshotState.IN_PROGRESS },
snapshots,
job.deletion.condition, log,
)
if (snapshotsToDelete.isNotEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ object DeletionFinishedState : State {
return@let
}

var snapshotPattern = job.policyName + "*"

if (job.deletion?.snapshotPattern != null) {
snapshotPattern += "," + job.deletion.snapshotPattern
}
val getSnapshotsRes =
client.getSnapshots(
job, "${job.policyName}*", metadataBuilder, log,
job, snapshotPattern, metadataBuilder, log,
getSnapshotMissingMessageInDeletionWorkflow(),
getSnapshotExceptionInDeletionWorkflow(snapshotsStartedDeletion),
)
metadataBuilder = getSnapshotsRes.metadataBuilder
if (getSnapshotsRes.failed) {
return SMResult.Fail(metadataBuilder, WorkflowType.DELETION)
}
val getSnapshots = getSnapshotsRes.snapshots
val snapshots = getSnapshotsRes.snapshots.distinctBy { it.snapshotId().name }

val existingSnapshotsNameSet = getSnapshots.map { it.snapshotId().name }.toSet()
val existingSnapshotsNameSet = snapshots.map { it.snapshotId().name }.toSet()
val remainingSnapshotsName = existingSnapshotsNameSet intersect snapshotsStartedDeletion.toSet()
if (remainingSnapshotsName.isEmpty()) {
val deletionMessage = "Snapshot(s) $snapshotsStartedDeletion deletion has finished."
Expand Down
Loading
Loading