diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 13749c2abb6d4..5ca26b5e2d48b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.action.ShardFollowTaskCleaner; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; @@ -182,6 +183,7 @@ public Collection createComponents( ccrLicenseChecker, restoreSourceService, new CcrRepositoryManager(settings, clusterService, client), + new ShardFollowTaskCleaner(clusterService, threadPool, client), new AutoFollowCoordinator( settings, client, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java new file mode 100644 index 0000000000000..af704b6741727 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.persistent.PersistentTaskResponse; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * A {@link ClusterStateListener} that completes any {@link ShardFollowTask} which concerns a deleted index. + */ +public class ShardFollowTaskCleaner implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(ShardFollowTaskCleaner.class); + + private final ThreadPool threadPool; + private final Client client; + + public ShardFollowTaskCleaner(final ClusterService clusterService, final ThreadPool threadPool, final Client client) { + this.threadPool = threadPool; + this.client = client; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + if (event.localNodeMaster() == false) { + return; + } + + MetaData metaData = event.state().metaData(); + PersistentTasksCustomMetaData persistentTasksMetaData = metaData.custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasksMetaData == null) { + return; + } + for (PersistentTasksCustomMetaData.PersistentTask persistentTask : persistentTasksMetaData.tasks()) { + if (ShardFollowTask.NAME.equals(persistentTask.getTaskName()) == false) { + // this task is not a shard follow task + continue; + } + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + Index followerIndex = shardFollowTask.getFollowShardId().getIndex(); + if (metaData.index(followerIndex) != null) { + // the index exists, do not clean this persistent task + continue; + } + IndexNotFoundException e = new IndexNotFoundException(followerIndex); + CompletionPersistentTaskAction.Request request = + new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), e); + threadPool.generic().submit(() -> { + client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener<>() { + + @Override + public void onResponse(PersistentTaskResponse persistentTaskResponse) { + logger.debug("task [{}] cleaned up", persistentTask.getId()); + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to clean up task [{}]", persistentTask.getId()); + } + }); + }); + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index a2d08c62d5bc0..89a5e8045c17e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -761,13 +761,14 @@ public void testDeleteFollowerIndex() throws Exception { StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); assertThat(response.getNodeFailures(), empty()); assertThat(response.getTaskFailures(), empty()); - assertThat(response.getStatsResponses(), hasSize(1)); - assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L)); - ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); - assertThat(fatalException, notNullValue()); - assertThat(fatalException.getMessage(), equalTo("no such index [index2]")); + if (response.getStatsResponses().isEmpty() == false) { + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L)); + ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException(); + assertThat(fatalException, notNullValue()); + assertThat(fatalException.getMessage(), equalTo("no such index [index2]")); + } }); - pauseFollow("index2"); ensureNoCcrTasks(); } @@ -1318,6 +1319,37 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception { } } + public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception { + final int numberOfShards = randomIntBetween(1, 10); + assertAcked(leaderClient().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)) + .build())); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L))); + + assertBusy(() -> { + String action = ShardFollowTask.NAME + "[c]"; + ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards)); + }); + + assertAcked(followerClient().admin().indices().prepareDelete("index2")); + + assertBusy(() -> { + String action = ShardFollowTask.NAME + "[c]"; + ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(0)); + }); + ensureNoCcrTasks(); + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {