diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index 906a8b706..f72d7e9ad 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -7,10 +7,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger +import org.opensearch.ExceptionsHelper import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.commons.ConfigConstants +import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.retry @@ -215,6 +217,14 @@ class SMStateMachine( metadata = md } } catch (ex: Exception) { + val unwrappedException = ExceptionsHelper.unwrapCause(ex) as Exception + if (unwrappedException is VersionConflictEngineException) { + // Don't throw the exception + // TODO: Extract seqNo on VersionConflictException and retry updateMetadata with updated seqNo. + log.error("Version conflict exception while updating metadata.", ex) + return + } + val smEx = SnapshotManagementException(ExceptionKey.METADATA_INDEXING_FAILURE, ex) log.error(smEx.message, ex) throw smEx diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt index 843b441b8..7efa82bb8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt @@ -5,13 +5,23 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine +import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.doAnswer import com.nhaarman.mockitokotlin2.spy import com.nhaarman.mockitokotlin2.times import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking +import org.opensearch.OpenSearchException import org.opensearch.common.unit.TimeValue +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indexmanagement.MocksTestCase +import org.opensearch.indexmanagement.opensearchapi.retry +import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState import org.opensearch.indexmanagement.snapshotmanagement.engine.states.creationTransitions import org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletionTransitions @@ -230,4 +240,66 @@ open class SMStateMachineTests : MocksTestCase() { assertEquals(1, firstValue.policyPrimaryTerm) } } + + fun `test updateMetadata handles VersionConflictEngineException gracefully`() = runBlocking { + val initialMetadata = randomSMMetadata( + policySeqNo = 0, + policyPrimaryTerm = 0, + ) + val smPolicy = randomSMPolicy( + seqNo = 1, + primaryTerm = 1, + ) + val updatedMetadata = randomSMMetadata( + policySeqNo = 1, + policyPrimaryTerm = 1, + ) + + doAnswer { + val listener = it.getArgument>(1) + listener.onFailure(VersionConflictEngineException(ShardId("index", "_na_", 1), "test", "message")) + }.whenever(client).index(any(), any()) + + val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager)) + + // Verify VersionConflictEngineException is handled gracefully + try { + stateMachineSpy.updateMetadata(updatedMetadata) + } catch (e: Exception) { + fail("VersionConflictEngineException should be handled without throwing: ${e.message}") + } + } + + fun `test updateMetadata throws SnapshotManagementException for other exceptions`() = runBlocking { + val initialMetadata = randomSMMetadata( + policySeqNo = 0, + policyPrimaryTerm = 0, + ) + val smPolicy = randomSMPolicy( + seqNo = 1, + primaryTerm = 1, + ) + val updatedMetadata = randomSMMetadata( + policySeqNo = 1, + policyPrimaryTerm = 1, + ) + + val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager)) + + val openSearchException = OpenSearchException("Test exception") + doAnswer { + val listener = it.getArgument>(1) + listener.onFailure(openSearchException) + }.whenever(client).index(any(), any()) + + // Verify OpenSearchException is wrapped in SnapshotManagementException + val thrownException = assertThrows(SnapshotManagementException::class.java) { + runBlocking { + stateMachineSpy.updateMetadata(updatedMetadata) + } + } + + // Verify exception type and cause + assertTrue(thrownException.cause is OpenSearchException) + } }