Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTaskCleaner;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
Expand Down Expand Up @@ -182,6 +183,7 @@ public Collection<Object> createComponents(
ccrLicenseChecker,
restoreSourceService,
new CcrRepositoryManager(settings, clusterService, client),
new ShardFollowTaskCleaner(clusterService, threadPool, client),
new AutoFollowCoordinator(
settings,
client,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.PersistentTaskResponse;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;

/**
* A {@link ClusterStateListener} that completes any {@link ShardFollowTask} which concerns tao deleted index.
*/
public class ShardFollowTaskCleaner implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(ShardFollowTaskCleaner.class);

private final ThreadPool threadPool;
private final Client client;

public ShardFollowTaskCleaner(final ClusterService clusterService, final ThreadPool threadPool, final Client client) {
this.threadPool = threadPool;
this.client = client;
clusterService.addListener(this);
}

@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return;
}
if (event.localNodeMaster() == false) {
return;
}

MetaData metaData = event.state().metaData();
PersistentTasksCustomMetaData persistentTasksMetaData = metaData.custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return;
}
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasksMetaData.tasks()) {
if (ShardFollowTask.NAME.equals(persistentTask.getTaskName()) == false) {
// this task is not a shard follow task
continue;
}
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
Index followerIndex = shardFollowTask.getFollowShardId().getIndex();
if (metaData.index(followerIndex) != null) {
// the index exists, do not clean this persistent task
continue;
}
IndexNotFoundException e = new IndexNotFoundException(followerIndex);
CompletionPersistentTaskAction.Request request =
new CompletionPersistentTaskAction.Request(persistentTask.getId(), persistentTask.getAllocationId(), e);
threadPool.generic().submit(() -> {
client.execute(CompletionPersistentTaskAction.INSTANCE, request, new ActionListener<>() {

@Override
public void onResponse(PersistentTaskResponse persistentTaskResponse) {
logger.debug("task [{}] cleaned up", persistentTask.getId());
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to clean up task [{}]", persistentTask.getId());
}
});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,14 @@ public void testDeleteFollowerIndex() throws Exception {
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index [index2]"));
if (response.getStatsResponses().isEmpty() == false) {
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index [index2]"));
}
});
pauseFollow("index2");
ensureNoCcrTasks();
}

Expand Down Expand Up @@ -1318,6 +1319,37 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception {
}
}

public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception {
final int numberOfShards = randomIntBetween(1, 10);
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
.build()));

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();

leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));

assertBusy(() -> {
String action = ShardFollowTask.NAME + "[c]";
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards));
});

assertAcked(followerClient().admin().indices().prepareDelete("index2"));

assertBusy(() -> {
String action = ShardFollowTask.NAME + "[c]";
ListTasksResponse listTasksResponse = followerClient().admin().cluster().prepareListTasks().setActions(action).get();
assertThat(listTasksResponse.getTasks(), hasSize(0));
});
ensureNoCcrTasks();
}

private long getFollowTaskSettingsVersion(String followerIndex) {
long settingsVersion = -1L;
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
Expand Down