Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

private volatile ElasticsearchException fatalException;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -373,7 +375,7 @@ private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable tas
long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
fatalException = ExceptionsHelper.convertToElastic(e);
}
}

Expand Down Expand Up @@ -423,7 +425,7 @@ protected void onCancelled() {
}

protected boolean isStopped() {
return isCancelled() || isCompleted();
return fatalException != null || isCancelled() || isCompleted();
}

public ShardId getFollowShardId() {
Expand Down Expand Up @@ -467,7 +469,8 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalException);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,30 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class TransportFollowStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
FollowStatsAction.StatsRequest,
FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> {

private final IndexNameExpressionResolver resolver;
private final CcrLicenseChecker ccrLicenseChecker;

@Inject
Expand All @@ -46,7 +44,6 @@ public TransportFollowStatsAction(
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver resolver,
final CcrLicenseChecker ccrLicenseChecker) {
super(
settings,
Expand All @@ -57,7 +54,6 @@ public TransportFollowStatsAction(
FollowStatsAction.StatsRequest::new,
FollowStatsAction.StatsResponses::new,
Ccr.CCR_THREAD_POOL_NAME);
this.resolver = Objects.requireNonNull(resolver);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}

Expand Down Expand Up @@ -90,11 +86,19 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.collect(Collectors.toSet());

for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
operation.accept(shardFollowNodeTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand All @@ -21,8 +20,10 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;

public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {

Expand All @@ -48,18 +49,32 @@ protected void doExecute(
final ActionListener<AcknowledgedResponse> listener) {

client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex());
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist"));
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}

final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());

if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}

final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
int i = 0;

for (String taskId : shardFollowTaskIds) {
final int shardId = i++;
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ public void testDeleteLeaderIndex() throws Exception {
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand All @@ -664,6 +675,17 @@ public void testDeleteFollowerIndex() throws Exception {

client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

public class ShardFollowNodeTaskTests extends ESTestCase {

private Exception fatalError;
private List<long[]> shardChangesRequests;
private List<List<Translog.Operation>> bulkShardOperationRequests;
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
Expand Down Expand Up @@ -345,7 +344,7 @@ public void testReceiveNonRetryableError() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, sameInstance(failure));
assertThat(task.getStatus().getFatalException().getRootCause(), sameInstance(failure));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
Expand Down Expand Up @@ -791,19 +790,13 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con

@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}

@Override
public void markAsCompleted() {
stopped.set(true);
}

@Override
public void markAsFailed(Exception e) {
fatalError = e;
stopped.set(true);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
Expand Down Expand Up @@ -46,7 +47,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -180,7 +180,8 @@ public void testChangeLeaderHistoryUUID() throws Exception {

assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
}
Expand Down Expand Up @@ -221,7 +222,8 @@ public void testChangeFollowerHistoryUUID() throws Exception {

assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
});
}
Expand Down Expand Up @@ -325,7 +327,6 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup,

BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
Expand Down Expand Up @@ -403,24 +404,14 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co

@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}

@Override
public void markAsCompleted() {
stopped.set(true);
}

@Override
public void markAsFailed(Exception e) {
failureHolder.set(e);
stopped.set(true);
}

@Override
public Exception getFailure() {
return failureHolder.get();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
Expand Down Expand Up @@ -48,7 +49,8 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
responses.add(new FollowStatsAction.StatsResponse(status));
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void testToXContent() throws IOException {
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
fetchExceptions,
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
new ElasticsearchException("fatal error"));
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(
Expand Down Expand Up @@ -181,7 +182,8 @@ public void testToXContent() throws IOException {
+ "}"
+ "}"
+ "],"
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + ","
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
+ "}"
+ "}"));
}
Expand Down Expand Up @@ -212,7 +214,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
0,
10,
fetchExceptions,
2);
2,
null);
XContentBuilder builder = jsonBuilder();
builder.value(status);
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Expand Down
Loading