Skip to content

Commit d64d90c

Browse files
authored
Indices deletion in leader cluster do not get replicated in follower cluster (#1556)
* Indices deletion in leader cluster do not get replicated in follower cluster Introduced a new setting parameter 'plugins.replication.replicate.delete_index'. If this propert's value is set to true, it will replicate the deletion of leader indecx to follower, othwise it will just pause the replication. Resolves #1434 Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]> * Indices deletion in leader cluster do not get replicated in follower cluster Introduced a new setting parameter 'plugins.replication.replicate.delete_index'. If this propert's value is set to true, it will replicate the deletion of leader indecx to follower, othwise it will just pause the replication. Resolves #1434 Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]> * Added a check of whether leader index is deleted or not Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]> * Moved plugins.replication.replicate.delete_index setting reference to ReplicationSettings.k Added a new test case with by setting plugins.replication.replicate.delete_index as false Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]> --------- Signed-off-by: Abdul Muneer Kolarkunnu <[email protected]>
1 parent 1501340 commit d64d90c

File tree

5 files changed

+150
-3
lines changed

5 files changed

+150
-3
lines changed

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
203203
Setting.Property.Dynamic, Setting.Property.NodeScope)
204204
val REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.autofollow.concurrent_replication_jobs_trigger_size", 3, 1, 10,
205205
Setting.Property.Dynamic, Setting.Property.NodeScope)
206+
val REPLICATION_REPLICATE_INDEX_DELETION: Setting<Boolean> = Setting.boolSetting("plugins.replication.replicate.delete_index", false,
207+
Setting.Property.Dynamic, Setting.Property.NodeScope)
206208
}
207209

208210
override fun createComponents(client: Client, clusterService: ClusterService, threadPool: ThreadPool,
@@ -363,7 +365,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
363365
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
364366
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
365367
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE,
366-
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
368+
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD, REPLICATION_REPLICATE_INDEX_DELETION)
367369
}
368370
override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
369371
clusterService: ClusterService, recoverySettings: RecoverySettings): Map<String, Repository.Factory> {

src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ open class ReplicationSettings(clusterService: ClusterService) {
3333
@Volatile var leaseRenewalMaxFailureDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION)
3434
@Volatile var followerBlockStart: Boolean = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START)
3535
@Volatile var autofollowConcurrentJobsTriggerSize: Int = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE)
36+
@Volatile var replicateIndexDeletion: Boolean = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION)
3637

3738
init {
3839
listenForUpdates(clusterService.clusterSettings)
@@ -51,5 +52,6 @@ open class ReplicationSettings(clusterService: ClusterService) {
5152
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL) { metadataSyncInterval = it }
5253
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START) { followerBlockStart = it }
5354
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) { autofollowConcurrentJobsTriggerSize = it }
55+
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION) { replicateIndexDeletion = it }
5456
}
5557
}

src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import org.opensearch.core.action.ActionListener
5252
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
5353
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
5454
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
55+
import org.opensearch.action.admin.indices.delete.DeleteIndexAction
5556
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
5657
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
5758
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
@@ -104,6 +105,9 @@ import java.util.stream.Collectors
104105
import kotlin.coroutines.resume
105106
import kotlin.coroutines.resumeWithException
106107
import kotlin.coroutines.suspendCoroutine
108+
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
109+
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
110+
import org.opensearch.replication.ReplicationPlugin
107111
import kotlin.streams.toList
108112
import org.opensearch.cluster.DiffableUtils
109113

@@ -134,6 +138,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
134138
override val log = Loggers.getLogger(javaClass, Index(params.followerIndexName, ClusterState.UNKNOWN_UUID))
135139
private val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), clusterService.state().metadata.clusterUUID(), remoteClient)
136140
private var shouldCallEvalMonitoring = true
141+
private var isLeaderIndexDeleted = false
137142
private var updateSettingsContinuousFailCount = 0
138143
private var updateAliasContinousFailCount = 0
139144

@@ -222,8 +227,18 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
222227
updateMetadata()
223228
}
224229
is FailedState -> {
225-
// Try pausing first if we get Failed state. This returns failed state if pause failed
226-
pauseReplication(state)
230+
// Try pausing or stopping if we get Failed state based on settings.
231+
// If index deletion replication is turned on in the settings and leader index is
232+
// not available then stop the replication, otherwise pause the replication.
233+
// This returns failed state if pause or stop failed
234+
if (replicationSettings.replicateIndexDeletion
235+
&& state.errorMsg.contains("org.opensearch.index.IndexNotFoundException - \"no such index ["
236+
+ leaderIndex.name + "]\"")) {
237+
isLeaderIndexDeleted = true
238+
stopReplication(state)
239+
} else {
240+
pauseReplication(state)
241+
}
227242
}
228243
else -> {
229244
state
@@ -682,6 +697,50 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
682697
return MonitoringState
683698
}
684699

700+
private suspend fun stopReplication(state: FailedState): IndexReplicationState {
701+
try {
702+
log.info("Going to initiate stop of index $followerIndexName due to deletion of corresponding leader index ${leaderIndex.name}")
703+
val stopReplicationResponse = client.suspendExecute(
704+
replicationMetadata,
705+
INTERNAL_STOP_REPLICATION_ACTION_TYPE, StopIndexReplicationRequest(followerIndexName),
706+
defaultContext = true
707+
)
708+
if (!stopReplicationResponse.isAcknowledged) {
709+
throw ReplicationException(
710+
"Failed to gracefully stop replication after deletion of leader index. " +
711+
"Replication tasks may need to be stopped manually and deleted follower index."
712+
)
713+
}
714+
} catch (e: CancellationException) {
715+
log.error("Encountered CancellationException while stopping $followerIndexName, ignoring it", e)
716+
} catch (e: Exception) {
717+
log.error("Encountered exception while stopping $followerIndexName", e)
718+
return FailedState(state.failedShards,
719+
"Stop failed with \"${e.message}\". Original failure for initiating stop - ${state.errorMsg}")
720+
}
721+
return CompletedState
722+
}
723+
724+
private suspend fun deleteIndex() {
725+
try {
726+
log.info("Going to initiate deletion of index $followerIndexName due to deletion of corresponding leader index ${leaderIndex.name}")
727+
val deleteIndexResponse = client.suspendExecute(
728+
replicationMetadata,
729+
DeleteIndexAction.INSTANCE, DeleteIndexRequest(followerIndexName),
730+
defaultContext = true
731+
)
732+
if (!deleteIndexResponse.isAcknowledged) {
733+
throw ReplicationException(
734+
"Failed to gracefully delete the follower index after deletion of the leader index. " +
735+
"Follower index may need to be deleted manually."
736+
)
737+
}
738+
} catch (e: Exception) {
739+
log.error("Encountered exception while deleting $followerIndexName", e)
740+
throw e
741+
}
742+
}
743+
685744
private fun findAllReplicationFailedShardTasks(followerIndexName: String, clusterState: ClusterState)
686745
:Map<ShardId, PersistentTask<ShardReplicationParams>> {
687746
val persistentTasks = clusterState.metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
@@ -718,6 +777,12 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
718777
client.execute(PauseIndexReplicationAction.INSTANCE,
719778
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
720779
}
780+
781+
// Deleting the follower index if replication is stopped because of leader index deletion
782+
if (clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION)
783+
&& currentTaskState.state == ReplicationState.COMPLETED && isLeaderIndexDeleted) {
784+
deleteIndex()
785+
}
721786
}
722787

723788
/* This is to minimise overhead of calling an additional listener as

src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,47 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
237237
`validate status syncing response`(statusResp)
238238
}, 30, TimeUnit.SECONDS)
239239
}
240+
241+
fun `test auto pause of index replication when leader index is unavailable and disabled the property replicate delete_index explicitly`() {
242+
val followerIndexName1 = "auto_pause_index"
243+
val leaderIndexName1 = "leader1"
244+
val followerIndexName2 = "no_auto_pause_index"
245+
val leaderIndexName2 = "leader2"
246+
val followerClient = getClientForCluster(FOLLOWER)
247+
val leaderClient = getClientForCluster(LEADER)
248+
249+
// Disabling the replication of delete index explicitly
250+
val settings = Settings.builder()
251+
.put("plugins.replication.replicate.delete_index", false)
252+
.build()
253+
val request = ClusterUpdateSettingsRequest()
254+
request.transientSettings(settings)
255+
followerClient.cluster().putSettings(request, RequestOptions.DEFAULT)
256+
257+
createConnectionBetweenClusters(FOLLOWER, LEADER)
258+
var createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName1), RequestOptions.DEFAULT)
259+
assertThat(createIndexResponse.isAcknowledged).isTrue()
260+
createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName2), RequestOptions.DEFAULT)
261+
assertThat(createIndexResponse.isAcknowledged).isTrue()
262+
// For followerIndexName1
263+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName1,
264+
followerIndexName1), waitForRestore = true)
265+
// For followerIndexName2
266+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName2,
267+
followerIndexName2), waitForRestore = true)
268+
val deleteResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName1), RequestOptions.DEFAULT)
269+
assertThat(deleteResponse.isAcknowledged)
270+
// followerIndexName1 -> autopause
271+
assertBusy({
272+
var statusResp = followerClient.replicationStatus(followerIndexName1)
273+
assertThat(statusResp.containsKey("status"))
274+
assertThat(statusResp.containsKey("reason"))
275+
`validate paused status response due to leader index deleted`(statusResp)
276+
}, 30, TimeUnit.SECONDS)
277+
// followerIndexName2 -> Syncing state
278+
assertBusy({
279+
var statusResp = followerClient.replicationStatus(followerIndexName2)
280+
`validate status syncing response`(statusResp)
281+
}, 30, TimeUnit.SECONDS)
282+
}
240283
}

src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
3030
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
3131
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
3232
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
33+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
3334
import org.opensearch.action.index.IndexRequest
3435
import org.opensearch.client.Request
3536
import org.opensearch.client.RequestOptions
@@ -243,6 +244,40 @@ class StopReplicationIT: MultiClusterRestTestCase() {
243244
followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT)
244245
}
245246

247+
fun `test delete follower index when leader index is unavailable`() {
248+
val followerIndexName2 = "follower_index2"
249+
val leaderIndexName2 = "leader_index2"
250+
val followerClient = getClientForCluster(FOLLOWER)
251+
val leaderClient = getClientForCluster(LEADER)
252+
253+
// Enabling the replication of delete index
254+
val settings = Settings.builder()
255+
.put("plugins.replication.replicate.delete_index", true)
256+
.build()
257+
val request = ClusterUpdateSettingsRequest()
258+
request.transientSettings(settings)
259+
followerClient.cluster().putSettings(request, RequestOptions.DEFAULT)
260+
261+
createConnectionBetweenClusters(FOLLOWER, LEADER, "source")
262+
val createIndex1Response = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
263+
assertThat(createIndex1Response.isAcknowledged).isTrue()
264+
val createIndex2Response = leaderClient.indices().create(CreateIndexRequest(leaderIndexName2), RequestOptions.DEFAULT)
265+
assertThat(createIndex2Response.isAcknowledged).isTrue()
266+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
267+
waitForRestore = true)
268+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName2, followerIndexName2),
269+
waitForRestore = true)
270+
271+
val deleteResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
272+
assertThat(deleteResponse.isAcknowledged)
273+
274+
// Make sure follower index got deleted after it is deleted from the leader, and it didn't affect any other indexes
275+
assertBusy({
276+
Assert.assertFalse(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
277+
}, 30, TimeUnit.SECONDS)
278+
Assert.assertTrue(followerClient.indices().exists(GetIndexRequest(followerIndexName2), RequestOptions.DEFAULT))
279+
}
280+
246281
fun `test stop replication with stale replication settings at leader cluster`() {
247282

248283
Assume.assumeFalse(SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())

0 commit comments

Comments
 (0)