Skip to content

Commit f6a1ccc

Browse files
authored
[6.8] Clean up ShardFollowTasks for deleted indices (#44702) (#44793)
Deleting a follower index does not delete its ShardFollowTasks, potentially leaving many persistent tasks in the cluster that cannot be allocated on nodes and unnecessary fill the logs. This commit adds a cluster state listener (ShardFollowTaskCleaner) that completes (with a failure) any persistent task that refers to a non existent follower index. I think that this bug has been introduced by #34404: before this change the task would have been completed as failed and removed from the cluster state. Backport of #44702 and #44801 on 6.8
1 parent c3cec6c commit f6a1ccc

File tree

4 files changed

+133
-21
lines changed

4 files changed

+133
-21
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.xpack.ccr.action.CcrRequests;
5353
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
5454
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
55+
import org.elasticsearch.xpack.ccr.action.ShardFollowTaskCleaner;
5556
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
5657
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
5758
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
@@ -191,6 +192,7 @@ public Collection<Object> createComponents(
191192
ccrLicenseChecker,
192193
restoreSourceService,
193194
new CcrRepositoryManager(settings, clusterService, (NodeClient) client),
195+
new ShardFollowTaskCleaner(clusterService, threadPool, client),
194196
new AutoFollowCoordinator(
195197
settings,
196198
client,
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.client.Client;
13+
import org.elasticsearch.cluster.ClusterChangedEvent;
14+
import org.elasticsearch.cluster.ClusterStateListener;
15+
import org.elasticsearch.cluster.metadata.MetaData;
16+
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.gateway.GatewayService;
18+
import org.elasticsearch.index.Index;
19+
import org.elasticsearch.index.IndexNotFoundException;
20+
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
21+
import org.elasticsearch.persistent.PersistentTaskResponse;
22+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
25+
/**
26+
* A {@link ClusterStateListener} that completes any {@link ShardFollowTask} which concerns a deleted index.
27+
*/
28+
public class ShardFollowTaskCleaner implements ClusterStateListener {
29+
30+
private static final Logger logger = LogManager.getLogger(ShardFollowTaskCleaner.class);
31+
32+
private final ThreadPool threadPool;
33+
private final Client client;
34+
35+
public ShardFollowTaskCleaner(final ClusterService clusterService, final ThreadPool threadPool, final Client client) {
36+
this.threadPool = threadPool;
37+
this.client = client;
38+
clusterService.addListener(this);
39+
}
40+
41+
@Override
42+
public void clusterChanged(final ClusterChangedEvent event) {
43+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
44+
return;
45+
}
46+
if (event.localNodeMaster() == false) {
47+
return;
48+
}
49+
50+
MetaData metaData = event.state().metaData();
51+
PersistentTasksCustomMetaData persistentTasksMetaData = metaData.custom(PersistentTasksCustomMetaData.TYPE);
52+
if (persistentTasksMetaData == null) {
53+
return;
54+
}
55+
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasksMetaData.tasks()) {
56+
if (ShardFollowTask.NAME.equals(persistentTask.getTaskName()) == false) {
57+
// this task is not a shard follow task
58+
continue;
59+
}
60+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
61+
Index followerIndex = shardFollowTask.getFollowShardId().getIndex();
62+
if (metaData.index(followerIndex) != null) {
63+
// the index exists, do not clean this persistent task
64+
continue;
65+
}
66+
IndexNotFoundException e = new IndexNotFoundException(followerIndex);
67+
CompletionPersistentTaskAction.Request request =
68+
new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), e);
69+
threadPool.generic().submit(() -> {
70+
client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener<PersistentTaskResponse>() {
71+
72+
@Override
73+
public void onResponse(PersistentTaskResponse persistentTaskResponse) {
74+
logger.debug("task [{}] cleaned up", persistentTask.getId());
75+
}
76+
77+
@Override
78+
public void onFailure(Exception e) {
79+
logger.warn("failed to clean up task [{}]", persistentTask.getId());
80+
}
81+
});
82+
});
83+
}
84+
}
85+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3131
import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings;
3232
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.hasSize;
3334
import static org.hamcrest.collection.IsEmptyCollection.empty;
3435

3536
/*
@@ -149,9 +150,8 @@ public void testFollowStatsApiResourceNotFound() throws Exception {
149150
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
150151
}
151152

152-
public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception {
153-
final String leaderIndexSettings = getIndexSettings(1, 0,
154-
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
153+
public void testFollowStatsApiWithDeletedFollowerIndex() throws Exception {
154+
final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
155155
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
156156
ensureGreen("leader1");
157157

@@ -171,18 +171,11 @@ public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex()
171171

172172
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet());
173173

174-
statsRequest = new FollowStatsAction.StatsRequest();
175-
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
176-
assertThat(response.getStatsResponses().size(), equalTo(1));
177-
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
178-
179-
statsRequest = new FollowStatsAction.StatsRequest();
180-
statsRequest.setIndices(new String[] {"follower1"});
181-
response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
182-
assertThat(response.getStatsResponses().size(), equalTo(1));
183-
assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1"));
184-
185-
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
174+
assertBusy(() -> {
175+
FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
176+
FollowStatsAction.StatsResponses statsResponse = client().execute(FollowStatsAction.INSTANCE, request).actionGet();
177+
assertThat(statsResponse.getStatsResponses(), hasSize(0));
178+
});
186179
}
187180

188181
public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -718,13 +718,14 @@ public void testDeleteFollowerIndex() throws Exception {
718718
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
719719
assertThat(response.getNodeFailures(), empty());
720720
assertThat(response.getTaskFailures(), empty());
721-
assertThat(response.getStatsResponses(), hasSize(1));
722-
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
723-
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
724-
assertThat(fatalException, notNullValue());
725-
assertThat(fatalException.getMessage(), equalTo("no such index"));
721+
if (response.getStatsResponses().isEmpty() == false) {
722+
assertThat(response.getStatsResponses(), hasSize(1));
723+
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
724+
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
725+
assertThat(fatalException, notNullValue());
726+
assertThat(fatalException.getMessage(), equalTo("no such index"));
727+
}
726728
});
727-
pauseFollow("index2");
728729
ensureNoCcrTasks();
729730
}
730731

@@ -1274,6 +1275,37 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception {
12741275
}
12751276
}
12761277

1278+
public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception {
1279+
final int numberOfShards = randomIntBetween(1, 10);
1280+
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
1281+
.setSettings(Settings.builder()
1282+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
1283+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
1284+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
1285+
.build()));
1286+
1287+
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
1288+
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
1289+
1290+
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
1291+
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L)));
1292+
1293+
assertBusy(() -> {
1294+
String action = ShardFollowTask.NAME + "[c]";
1295+
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
1296+
assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards));
1297+
});
1298+
1299+
assertAcked(followerClient().admin().indices().prepareDelete("index2"));
1300+
1301+
assertBusy(() -> {
1302+
String action = ShardFollowTask.NAME + "[c]";
1303+
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
1304+
assertThat(listTasksResponse.getTasks(), hasSize(0));
1305+
});
1306+
ensureNoCcrTasks();
1307+
}
1308+
12771309
private long getFollowTaskSettingsVersion(String followerIndex) {
12781310
long settingsVersion = -1L;
12791311
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {

0 commit comments

Comments
 (0)