Skip to content

Commit 0796995

Browse files
martijnvgkcm
authored andcommitted
[CCR] Don't fail shard follow tasks in case of a non-retryable error (#34404)
1 parent e16f143 commit 0796995

File tree

10 files changed

+116
-54
lines changed

10 files changed

+116
-54
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
8585
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
8686
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
8787

88+
private volatile ElasticsearchException fatalException;
89+
8890
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
8991
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
9092
super(id, type, action, description, parentTask, headers);
@@ -373,7 +375,7 @@ private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable tas
373375
long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis());
374376
scheduler.accept(TimeValue.timeValueMillis(delay), task);
375377
} else {
376-
markAsFailed(e);
378+
fatalException = ExceptionsHelper.convertToElastic(e);
377379
}
378380
}
379381

@@ -423,7 +425,7 @@ protected void onCancelled() {
423425
}
424426

425427
protected boolean isStopped() {
426-
return isCancelled() || isCompleted();
428+
return fatalException != null || isCancelled() || isCompleted();
427429
}
428430

429431
public ShardId getFollowShardId() {
@@ -467,7 +469,8 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
467469
.stream()
468470
.collect(
469471
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
470-
timeSinceLastFetchMillis);
472+
timeSinceLastFetchMillis,
473+
fatalException);
471474
}
472475

473476
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,30 @@
1212
import org.elasticsearch.action.support.ActionFilters;
1313
import org.elasticsearch.action.support.tasks.TransportTasksAction;
1414
import org.elasticsearch.cluster.ClusterState;
15-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1615
import org.elasticsearch.cluster.service.ClusterService;
1716
import org.elasticsearch.common.inject.Inject;
1817
import org.elasticsearch.common.io.stream.StreamInput;
1918
import org.elasticsearch.common.settings.Settings;
2019
import org.elasticsearch.license.LicenseUtils;
20+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2121
import org.elasticsearch.tasks.Task;
2222
import org.elasticsearch.transport.TransportService;
2323
import org.elasticsearch.xpack.ccr.Ccr;
2424
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
2525
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
2626

2727
import java.io.IOException;
28-
import java.util.Arrays;
29-
import java.util.HashSet;
3028
import java.util.List;
3129
import java.util.Objects;
3230
import java.util.Set;
3331
import java.util.function.Consumer;
32+
import java.util.stream.Collectors;
3433

3534
public class TransportFollowStatsAction extends TransportTasksAction<
3635
ShardFollowNodeTask,
3736
FollowStatsAction.StatsRequest,
3837
FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> {
3938

40-
private final IndexNameExpressionResolver resolver;
4139
private final CcrLicenseChecker ccrLicenseChecker;
4240

4341
@Inject
@@ -46,7 +44,6 @@ public TransportFollowStatsAction(
4644
final ClusterService clusterService,
4745
final TransportService transportService,
4846
final ActionFilters actionFilters,
49-
final IndexNameExpressionResolver resolver,
5047
final CcrLicenseChecker ccrLicenseChecker) {
5148
super(
5249
settings,
@@ -57,7 +54,6 @@ public TransportFollowStatsAction(
5754
FollowStatsAction.StatsRequest::new,
5855
FollowStatsAction.StatsResponses::new,
5956
Ccr.CCR_THREAD_POOL_NAME);
60-
this.resolver = Objects.requireNonNull(resolver);
6157
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
6258
}
6359

@@ -90,11 +86,19 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
9086
@Override
9187
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
9288
final ClusterState state = clusterService.state();
93-
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
89+
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
90+
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
91+
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
92+
.map(persistentTask -> {
93+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
94+
return shardFollowTask.getFollowShardId().getIndexName();
95+
})
96+
.collect(Collectors.toSet());
97+
9498
for (final Task task : taskManager.getTasks().values()) {
9599
if (task instanceof ShardFollowNodeTask) {
96100
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
97-
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
101+
if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
98102
operation.accept(shardFollowNodeTask);
99103
}
100104
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.support.HandledTransportAction;
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1414
import org.elasticsearch.client.Client;
15-
import org.elasticsearch.cluster.metadata.IndexMetaData;
1615
import org.elasticsearch.common.inject.Inject;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -21,8 +20,10 @@
2120
import org.elasticsearch.transport.TransportService;
2221
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
2322

23+
import java.util.List;
2424
import java.util.concurrent.atomic.AtomicInteger;
2525
import java.util.concurrent.atomic.AtomicReferenceArray;
26+
import java.util.stream.Collectors;
2627

2728
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
2829

@@ -48,18 +49,32 @@ protected void doExecute(
4849
final ActionListener<AcknowledgedResponse> listener) {
4950

5051
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
51-
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex());
52-
if (followIndexMetadata == null) {
53-
listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist"));
52+
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
53+
if (persistentTasksMetaData == null) {
54+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
5455
return;
5556
}
5657

57-
final int numShards = followIndexMetadata.getNumberOfShards();
58-
final AtomicInteger counter = new AtomicInteger(numShards);
59-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
60-
for (int i = 0; i < numShards; i++) {
61-
final int shardId = i;
62-
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
58+
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
59+
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
60+
.filter(persistentTask -> {
61+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
62+
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
63+
})
64+
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
65+
.collect(Collectors.toList());
66+
67+
if (shardFollowTaskIds.isEmpty()) {
68+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
69+
return;
70+
}
71+
72+
final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
73+
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
74+
int i = 0;
75+
76+
for (String taskId : shardFollowTaskIds) {
77+
final int shardId = i++;
6378
persistentTasksService.sendRemoveRequest(taskId,
6479
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
6580
@Override

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,17 @@ public void testDeleteLeaderIndex() throws Exception {
647647
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
648648

649649
client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
650+
assertBusy(() -> {
651+
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
652+
assertThat(response.getNodeFailures(), empty());
653+
assertThat(response.getTaskFailures(), empty());
654+
assertThat(response.getStatsResponses(), hasSize(1));
655+
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
656+
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
657+
assertThat(fatalException, notNullValue());
658+
assertThat(fatalException.getMessage(), equalTo("no such index"));
659+
});
660+
unfollowIndex("index2");
650661
ensureNoCcrTasks();
651662
}
652663

@@ -666,6 +677,17 @@ public void testDeleteFollowerIndex() throws Exception {
666677

667678
client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
668679
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
680+
assertBusy(() -> {
681+
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
682+
assertThat(response.getNodeFailures(), empty());
683+
assertThat(response.getTaskFailures(), empty());
684+
assertThat(response.getStatsResponses(), hasSize(1));
685+
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
686+
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
687+
assertThat(fatalException, notNullValue());
688+
assertThat(fatalException.getMessage(), equalTo("no such index"));
689+
});
690+
unfollowIndex("index2");
669691
ensureNoCcrTasks();
670692
}
671693

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
5656
randomNonNegativeLong(),
5757
randomNonNegativeLong(),
5858
randomReadExceptions(),
59-
randomLong());
59+
randomLong(),
60+
randomBoolean() ? new ElasticsearchException("fatal error") : null);
6061
}
6162

6263
@Override

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
public class ShardFollowNodeTaskTests extends ESTestCase {
4747

48-
private Exception fatalError;
4948
private List<long[]> shardChangesRequests;
5049
private List<List<Translog.Operation>> bulkShardOperationRequests;
5150
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
@@ -345,7 +344,7 @@ public void testReceiveNonRetryableError() {
345344
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
346345

347346
assertTrue("task is stopped", task.isStopped());
348-
assertThat(fatalError, sameInstance(failure));
347+
assertThat(task.getStatus().getFatalException().getRootCause(), sameInstance(failure));
349348
ShardFollowNodeTaskStatus status = task.getStatus();
350349
assertThat(status.numberOfConcurrentReads(), equalTo(1));
351350
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
@@ -791,19 +790,13 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
791790

792791
@Override
793792
protected boolean isStopped() {
794-
return stopped.get();
793+
return super.isStopped() || stopped.get();
795794
}
796795

797796
@Override
798797
public void markAsCompleted() {
799798
stopped.set(true);
800799
}
801-
802-
@Override
803-
public void markAsFailed(Exception e) {
804-
fatalError = e;
805-
stopped.set(true);
806-
}
807800
};
808801
}
809802

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.carrotsearch.hppc.LongHashSet;
99
import com.carrotsearch.hppc.LongSet;
10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.Version;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.DocWriteResponse;
@@ -46,7 +47,6 @@
4647
import java.util.List;
4748
import java.util.Set;
4849
import java.util.concurrent.atomic.AtomicBoolean;
49-
import java.util.concurrent.atomic.AtomicReference;
5050
import java.util.function.BiConsumer;
5151
import java.util.function.Consumer;
5252
import java.util.function.LongConsumer;
@@ -180,7 +180,8 @@ public void testChangeLeaderHistoryUUID() throws Exception {
180180

181181
assertBusy(() -> {
182182
assertThat(shardFollowTask.isStopped(), is(true));
183-
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
183+
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
184+
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
184185
"], actual [" + newHistoryUUID + "]"));
185186
});
186187
}
@@ -221,7 +222,8 @@ public void testChangeFollowerHistoryUUID() throws Exception {
221222

222223
assertBusy(() -> {
223224
assertThat(shardFollowTask.isStopped(), is(true));
224-
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
225+
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
226+
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
225227
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
226228
});
227229
}
@@ -325,7 +327,6 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup,
325327

326328
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
327329
AtomicBoolean stopped = new AtomicBoolean(false);
328-
AtomicReference<Exception> failureHolder = new AtomicReference<>();
329330
LongSet fetchOperations = new LongHashSet();
330331
return new ShardFollowNodeTask(
331332
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@@ -403,24 +404,14 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
403404

404405
@Override
405406
protected boolean isStopped() {
406-
return stopped.get();
407+
return super.isStopped() || stopped.get();
407408
}
408409

409410
@Override
410411
public void markAsCompleted() {
411412
stopped.set(true);
412413
}
413414

414-
@Override
415-
public void markAsFailed(Exception e) {
416-
failureHolder.set(e);
417-
stopped.set(true);
418-
}
419-
420-
@Override
421-
public Exception getFailure() {
422-
return failureHolder.get();
423-
}
424415
};
425416
}
426417

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ccr.action;
77

8+
import org.elasticsearch.ElasticsearchException;
89
import org.elasticsearch.test.AbstractStreamableTestCase;
910
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
1011
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
@@ -48,7 +49,8 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
4849
randomNonNegativeLong(),
4950
randomNonNegativeLong(),
5051
Collections.emptyNavigableMap(),
51-
randomLong());
52+
randomLong(),
53+
randomBoolean() ? new ElasticsearchException("fatal error") : null);
5254
responses.add(new FollowStatsAction.StatsResponse(status));
5355
}
5456
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ public void testToXContent() throws IOException {
130130
numberOfFailedBulkOperations,
131131
numberOfOperationsIndexed,
132132
fetchExceptions,
133-
timeSinceLastFetchMillis);
133+
timeSinceLastFetchMillis,
134+
new ElasticsearchException("fatal error"));
134135
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
135136
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
136137
assertThat(
@@ -181,7 +182,8 @@ public void testToXContent() throws IOException {
181182
+ "}"
182183
+ "}"
183184
+ "],"
184-
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
185+
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + ","
186+
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
185187
+ "}"
186188
+ "}"));
187189
}
@@ -212,7 +214,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
212214
0,
213215
10,
214216
fetchExceptions,
215-
2);
217+
2,
218+
null);
216219
XContentBuilder builder = jsonBuilder();
217220
builder.value(status);
218221
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);

0 commit comments

Comments
 (0)