From 1f6bf508369d6c76d8e2daf9eb3af7b9d689a54f Mon Sep 17 00:00:00 2001 From: Nhat Date: Sun, 29 Oct 2017 20:46:55 -0400 Subject: [PATCH 1/7] Snapshot: Use TransportMasterNodeAction to update Currently, we are using a plain TransportRequestHandler to post snapshot status messages to the master. However, it doesn't have a robust retry mechanism as TransportMasterNodeAction. This changes migrate from TransportRequestHandler to TransportMasterNodeAction. Most of code in TransportSnapshotUpdateStatusAction is copied from SnapshotShardsService. Serializing a MasterNodeRequest requires 8 bytes more than a TransportRequest. In order to maintain the BWC in a mixed cluster, we have to serialize/deserialize a MasterNodeRequest as a TransportRequest without timeout. Closes #27151 --- .../elasticsearch/action/ActionModule.java | 6 +- .../support/master/MasterNodeRequest.java | 18 ++ .../snapshots/SnapshotShardsService.java | 224 +++--------------- .../snapshots/SnapshotsService.java | 2 +- .../TransportSnapshotUpdateStatusAction.java | 200 ++++++++++++++++ .../snapshots/UpdateSnapshotStatusAction.java | 43 ++++ .../UpdateSnapshotStatusRequest.java | 97 ++++++++ .../UpdateSnapshotStatusRequestBuilder.java | 30 +++ .../UpdateSnapshotStatusResponse.java | 26 ++ .../UpdateSnapshotStatusRequestTests.java | 80 +++++++ qa/mixed-cluster/build.gradle | 5 + .../elasticsearch/backwards/IndexingIT.java | 61 +++++ 12 files changed, 596 insertions(+), 196 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java create mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java create mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java create mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java create mode 100644 core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index 86582e9b8d046..c420cadf41fff 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -181,7 +181,6 @@ import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; -import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -199,7 +198,6 @@ import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; -import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -311,6 +309,8 @@ import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; +import org.elasticsearch.snapshots.TransportSnapshotUpdateStatusAction; +import org.elasticsearch.snapshots.UpdateSnapshotStatusAction; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; @@ -324,7 +324,6 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; /** @@ -432,6 +431,7 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); + actions.register(UpdateSnapshotStatusAction.INSTANCE, TransportSnapshotUpdateStatusAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java b/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java index 2bad309f1cc3b..f7dec3e3adacc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java @@ -76,4 +76,22 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); masterNodeTimeout = new TimeValue(in); } + + /** + * CAUTION: Use this method for the BWC purpose only. + * This method serializes a {@link MasterNodeRequest} as a {@link org.elasticsearch.transport.TransportRequest} + * without timeout. The master will have to use the default timeout setting. + */ + protected final void readFromAsTransportRequest(StreamInput in) throws IOException { + super.readFrom(in); + } + + /** + * CAUTION: Use this method for the BWC purpose only. + * This method deserializes a {@link MasterNodeRequest} from a {@link org.elasticsearch.transport.TransportRequest} + * without timeout. The master will have to use the default timeout setting. + */ + protected final void writeToAsTransportRequest(StreamOutput out) throws IOException { + super.writeTo(out); + } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 259136ca9ccfd..335fd0c8b12cd 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,23 +23,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -57,17 +53,8 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; -import java.util.ArrayList; + import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -79,15 +66,12 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; -import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ -public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener { - - public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; +public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private final ClusterService clusterService; @@ -95,8 +79,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final SnapshotsService snapshotsService; - private final TransportService transportService; - private final ThreadPool threadPool; private final Lock shutdownLock = new ReentrantLock(); @@ -104,29 +86,21 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Condition shutdownCondition = shutdownLock.newCondition(); private volatile Map shardSnapshots = emptyMap(); - - private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); + private final Client client; @Inject - public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, - TransportService transportService, IndicesService indicesService) { + public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, + ThreadPool threadPool, IndicesService indicesService, Client client) { super(settings); this.indicesService = indicesService; this.snapshotsService = snapshotsService; - this.transportService = transportService; this.clusterService = clusterService; this.threadPool = threadPool; + this.client = client; if (DiscoveryNode.isDataNode(settings)) { - // this is only useful on the nodes that can hold data - // addLowPriorityApplier to make sure that Repository will be created before snapshot - clusterService.addLowPriorityApplier(this); + // this is only useful on the nodes that can hold data. + clusterService.addListener(this); } - - if (DiscoveryNode.isMasterNode(settings)) { - // This needs to run only on nodes that can become masters - transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler()); - } - } @Override @@ -151,11 +125,11 @@ protected void doStop() { @Override protected void doClose() { - clusterService.removeApplier(this); + clusterService.removeListener(this); } @Override - public void applyClusterState(ClusterChangedEvent event) { + public void clusterChanged(ClusterChangedEvent event) { try { SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); @@ -235,7 +209,6 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { Map> newSnapshots = new HashMap<>(); // Now go through all snapshots and update existing or create missing final String localNodeId = event.state().nodes().getLocalNodeId(); - final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); final Map> snapshotIndices = new HashMap<>(); if (snapshotsInProgress != null) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { @@ -286,12 +259,12 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { case DONE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + new ShardSnapshotStatus(localNodeId, State.SUCCESS)); break; case FAILURE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure()), masterNode); + new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure())); break; default: throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); @@ -333,20 +306,20 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { public void doRun() { snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + new ShardSnapshotStatus(localNodeId, State.SUCCESS)); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e); updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); + new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e))); } }); } catch (Exception e) { updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); + new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e))); } } } @@ -385,9 +358,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } - } catch (SnapshotFailedEngineException e) { - throw e; - } catch (IndexShardSnapshotFailedException e) { + } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { throw e; } catch (Exception e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); @@ -403,7 +374,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { return; } final String localNodeId = event.state().nodes().getLocalNodeId(); - final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { Map localShards = currentSnapshotShards(snapshot.snapshot()); @@ -419,12 +389,12 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + new ShardSnapshotStatus(localNodeId, State.SUCCESS)); } else if (localShard.getValue().stage() == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure()), masterNode); + new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure())); } } @@ -445,148 +415,18 @@ private SnapshotShards(Map shards) { } } - - /** - * Internal request that is used to send changes in snapshot status to master - */ - public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { - private Snapshot snapshot; - private ShardId shardId; - private ShardSnapshotStatus status; - - public UpdateIndexShardSnapshotStatusRequest() { - - } - - public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = new ShardSnapshotStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - public Snapshot snapshot() { - return snapshot; - } - - public ShardId shardId() { - return shardId; - } - - public ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - } - - /** - * Updates the shard status - */ - public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); - try { - transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); - } - } - - /** - * Updates the shard status on master node - * - * @param request update shard status request - */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { - logger.trace("received updated snapshot restore state [{}]", request); - clusterService.submitStateUpdateTask( - "update snapshot state", - request, - ClusterStateTaskConfig.build(Priority.NORMAL), - snapshotStateExecutor, - (source, e) -> logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", - request.snapshot(), request.shardId(), request.status()), e)); - } - - class SnapshotStateExecutor implements ClusterStateTaskExecutor { - - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } - } - - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); - entries.add(updatedEntry); - // Finalize snapshot in the repository - snapshotsService.endSnapshot(updatedEntry); - logger.info("snapshot [{}] is done", updatedEntry.snapshot()); - } - } else { - entries.add(entry); - } - } - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - - final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); - } + void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { + final UpdateSnapshotStatusRequest request = new UpdateSnapshotStatusRequest(snapshot, shardId, status); + client.admin().cluster().execute(UpdateSnapshotStatusAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(UpdateSnapshotStatusResponse updateSnapshotStatusResponse) { + logger.debug((Supplier) () -> new ParameterizedMessage("snapshot state is updated on the master [{}]", request)); } - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - } - /** - * Transport request handler that is used to send changes in snapshot status to master - */ - class UpdateSnapshotStateRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception { - innerUpdateSnapshotState(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } + @Override + public void onFailure(Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("failed to send/update a snapshot state [{}]", request), e); + } + }); } - } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 037db4d5caf66..be55009cf7e28 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -97,7 +97,7 @@ *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method
  • - *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • + *
  • When last shard is completed master node in {@link TransportSnapshotUpdateStatusAction#innerUpdateSnapshotState} method marks the snapshot as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • * diff --git a/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java b/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java new file mode 100644 index 0000000000000..2b950e2d2dfe7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.cluster.SnapshotsInProgress.completed; + +/** + * A {@link TransportSnapshotUpdateStatusAction} receives snapshot state messages from {@link SnapshotShardsService}, + * then computes and updates the {@link ClusterState}. + */ +public class TransportSnapshotUpdateStatusAction extends TransportMasterNodeAction { + private final SnapshotUpdateStateExecutor snapshotStateExecutor; + + @Inject + public TransportSnapshotUpdateStatusAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, + ActionFilters actionFilters, SnapshotsService snapshotsService) { + super(settings, UpdateSnapshotStatusAction.NAME, transportService, clusterService, + threadPool, actionFilters, indexNameExpressionResolver, UpdateSnapshotStatusRequest::new); + this.snapshotStateExecutor = new SnapshotUpdateStateExecutor(snapshotsService, logger); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateSnapshotStatusResponse newResponse() { + return new UpdateSnapshotStatusResponse(); + } + + @Override + protected void masterOperation(UpdateSnapshotStatusRequest request, ClusterState state, + ActionListener listener) throws Exception { + innerUpdateSnapshotState(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(UpdateSnapshotStatusRequest request, ClusterState state) { + return null; + } + + void innerUpdateSnapshotState(final UpdateSnapshotStatusRequest request, ActionListener listener) { + logger.trace((Supplier) () -> new ParameterizedMessage("received updated snapshot restore status [{}]", request)); + clusterService.submitStateUpdateTask( + "update snapshot state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + snapshotStateExecutor, + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage( + "unexpected failure while updating snapshot status [{}]", request), e); + try { + listener.onFailure(e); + } catch (Exception channelException) { + channelException.addSuppressed(e); + logger.warn((Supplier) () -> new ParameterizedMessage( + "failed to send failure [{}] while updating snapshot status [{}]", e, request), channelException); + } + } + + @Override + public void onNoLongerMaster(String source) { + logger.error("no longer master while updating snapshot status [{}]", request); + try { + listener.onFailure(new NotMasterException(source)); + } catch (Exception channelException) { + logger.warn((Supplier) () -> new ParameterizedMessage( + "{} failed to send no longer master updating snapshot[{}]", request.snapshot(), request), channelException); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + listener.onResponse(new UpdateSnapshotStatusResponse()); + } catch (Exception channelException) { + logger.warn((Supplier) () -> new ParameterizedMessage( + "failed to send response after updating snapshot status [{}]", request), channelException); + } + } + } + ); + } + + // The client node sends the update message to the master, then the master node updates the ClusterState. + static class SnapshotUpdateStateExecutor implements ClusterStateTaskExecutor { + private final SnapshotsService snapshotsService; + private final Logger logger; + + SnapshotUpdateStateExecutor(SnapshotsService snapshotsService, Logger logger) { + this.snapshotsService = snapshotsService; + this.logger = logger; + } + + @Override + public ClusterTasksResult execute(ClusterState currentState, + List tasks) throws Exception { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean updated = false; + + for (UpdateSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.snapshot().equals(updateSnapshotState.snapshot())) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] Updating shard [{}] with status [{}]", + updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); + } + if (updated == false) { + shards.putAll(entry.shards()); + updated = true; + } + shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + changedCount++; + } + } + + if (updated) { + if (completed(shards.values()) == false) { + entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); + } else { + // Snapshot is finished - mark it as done + // TODO: Add PARTIAL_SUCCESS status? + SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, + SnapshotsInProgress.State.SUCCESS, shards.build()); + entries.add(updatedEntry); + // Finalize snapshot in the repository + snapshotsService.endSnapshot(updatedEntry); + logger.info("snapshot [{}] is done", updatedEntry.snapshot()); + } + } else { + entries.add(entry); + } + } + if (changedCount > 0) { + if (logger.isTraceEnabled()) { + logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); + } + final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress( + entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); + } + } + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java new file mode 100644 index 0000000000000..d8253e7bed316 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class UpdateSnapshotStatusAction extends Action { + public static final UpdateSnapshotStatusAction INSTANCE = new UpdateSnapshotStatusAction(); + public static final String NAME = "internal:cluster/snapshot/update_snapshot"; + + public UpdateSnapshotStatusAction() { + super(NAME); + } + + @Override + public UpdateSnapshotStatusRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new UpdateSnapshotStatusRequestBuilder(client, UpdateSnapshotStatusAction.INSTANCE); + } + + @Override + public UpdateSnapshotStatusResponse newResponse() { + return new UpdateSnapshotStatusResponse(); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java new file mode 100644 index 0000000000000..4675c65f77228 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Internal request that is used to send changes in snapshot status to master + */ +class UpdateSnapshotStatusRequest extends MasterNodeRequest { + private Snapshot snapshot; + private ShardId shardId; + private SnapshotsInProgress.ShardSnapshotStatus status; + + UpdateSnapshotStatusRequest() { + + } + + UpdateSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + // To keep BWC, we have to deserialize a MasterNodeRequest from a TransportRequest from older versions. + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + super.readFromAsTransportRequest(in); + } else { + super.readFrom(in); + } + snapshot = new Snapshot(in); + shardId = ShardId.readShardId(in); + status = new SnapshotsInProgress.ShardSnapshotStatus(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // To keep BWC, we have to serialize a MasterNodeRequest as a TransportRequest for older versions. + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + super.writeToAsTransportRequest(out); + } else { + super.writeTo(out); + } + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + Snapshot snapshot() { + return snapshot; + } + + ShardId shardId() { + return shardId; + } + + SnapshotsInProgress.ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java new file mode 100644 index 0000000000000..40a40e7d510d7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +class UpdateSnapshotStatusRequestBuilder extends MasterNodeOperationRequestBuilder { + UpdateSnapshotStatusRequestBuilder(ElasticsearchClient client, UpdateSnapshotStatusAction action) { + super(client, action, new UpdateSnapshotStatusRequest()); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java new file mode 100644 index 0000000000000..62b149843e3d7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionResponse; + +class UpdateSnapshotStatusResponse extends ActionResponse { + +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java b/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java new file mode 100644 index 0000000000000..341e95037134f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.UUID; + +import static org.hamcrest.Matchers.equalTo; + +public class UpdateSnapshotStatusRequestTests extends ESTestCase { + + public void testSerializeForOlderVersion() throws Exception { + UpdateSnapshotStatusRequest originalRequest = new UpdateSnapshotStatusRequest(randomSnapshot(), randomShardId(), randomStatus()); + Version bwcVersion = randomFrom(Version.V_5_1_2, Version.V_5_4_1, Version.V_6_0_0_alpha1); + final UpdateSnapshotStatusRequest cloneRequest = new UpdateSnapshotStatusRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(bwcVersion); + originalRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(bwcVersion); + cloneRequest.readFrom(in); + } + } + assertThat(cloneRequest.snapshot(), equalTo(originalRequest.snapshot())); + assertThat(cloneRequest.shardId(), equalTo(originalRequest.shardId())); + assertThat(cloneRequest.status(), equalTo(originalRequest.status())); + } + + public void testSerializeForCurrentVersion() throws Exception { + UpdateSnapshotStatusRequest originalRequest = new UpdateSnapshotStatusRequest(randomSnapshot(), randomShardId(), randomStatus()); + final UpdateSnapshotStatusRequest cloneRequest = new UpdateSnapshotStatusRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + originalRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + cloneRequest.readFrom(in); + } + } + assertThat(cloneRequest.snapshot(), equalTo(originalRequest.snapshot())); + assertThat(cloneRequest.shardId(), equalTo(originalRequest.shardId())); + assertThat(cloneRequest.status(), equalTo(originalRequest.status())); + } + + private ShardId randomShardId() { + return new ShardId(new Index(randomAlphaOfLength(10), UUID.randomUUID().toString()), between(0, 1000)); + } + + private Snapshot randomSnapshot() { + SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(10), UUID.randomUUID().toString()); + return new Snapshot(randomAlphaOfLength(10), snapshotId); + } + + private SnapshotsInProgress.ShardSnapshotStatus randomStatus() { + return new SnapshotsInProgress.ShardSnapshotStatus("node-" + between(0, 100), + randomFrom(SnapshotsInProgress.State.values()), randomAlphaOfLength(20)); + } +} diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 66185325931d8..a49df4d083980 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -54,6 +54,11 @@ for (Version version : wireCompatVersions) { if (project.bwc_tests_enabled) { bwcTest.dependsOn(versionBwcTest) } + + /* To support taking index snapshots, we have to set path.repo setting */ + tasks.getByName("${baseName}#mixedClusterTestRunner").configure { + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") + } } test.enabled = false // no unit tests for rolling upgrades, only the rest integration test diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index c6200417e39d8..e5fa0ccb8eb13 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; @@ -42,7 +43,9 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class IndexingIT extends ESRestTestCase { @@ -237,6 +240,64 @@ public void testSeqNoCheckpoints() throws Exception { } } + /** + * Previously, we use a TransportHandler to send/receive the snapshot state messages. + * This handler does not retry if there is failure. We decided to replace it by TransportMasterNodeAction which does retry if needed. + * + * This test makes sure that these two approaches are compatible to one another. + * In particular, the master node should receive and handle the snapshot state messages from old nodes properly. + */ + public void testUpdateSnapshotStates() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assertThat(nodes.getNewNodes(), not(empty())); + logger.info("cluster discovered: {}", nodes.toString()); + + // Create the repository before taking the snapshot. + String repoConfig = JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .endObject() + .endObject() + .string(); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo", emptyMap(), + new StringEntity(repoConfig, ContentType.APPLICATION_JSON)) + ); + + String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + + // Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes. + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._name", bwcNames); + + final String index = "test-snapshot-index"; + createIndex(index, settings.build()); + indexDocs(index, 0, between(50, 100)); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + + // Allocating shards on all nodes, taking snapshots should happen on all nodes. + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); assertOK(response); From 37b805074ea09406eebaa25cde93fa4ae243cd85 Mon Sep 17 00:00:00 2001 From: Nhat Date: Sat, 4 Nov 2017 21:52:36 -0400 Subject: [PATCH 2/7] Revert "Snapshot: Use TransportMasterNodeAction to update" This reverts commit 1f6bf508369d6c76d8e2daf9eb3af7b9d689a54f. --- .../elasticsearch/action/ActionModule.java | 6 +- .../support/master/MasterNodeRequest.java | 18 -- .../snapshots/SnapshotShardsService.java | 224 +++++++++++++++--- .../snapshots/SnapshotsService.java | 2 +- .../TransportSnapshotUpdateStatusAction.java | 200 ---------------- .../snapshots/UpdateSnapshotStatusAction.java | 43 ---- .../UpdateSnapshotStatusRequest.java | 97 -------- .../UpdateSnapshotStatusRequestBuilder.java | 30 --- .../UpdateSnapshotStatusResponse.java | 26 -- .../UpdateSnapshotStatusRequestTests.java | 80 ------- qa/mixed-cluster/build.gradle | 5 - .../elasticsearch/backwards/IndexingIT.java | 61 ----- 12 files changed, 196 insertions(+), 596 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java delete mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java delete mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java delete mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java delete mode 100644 core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java delete mode 100644 core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index c420cadf41fff..86582e9b8d046 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -181,6 +181,7 @@ import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; +import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -198,6 +199,7 @@ import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -309,8 +311,6 @@ import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; -import org.elasticsearch.snapshots.TransportSnapshotUpdateStatusAction; -import org.elasticsearch.snapshots.UpdateSnapshotStatusAction; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; @@ -324,6 +324,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; /** @@ -431,7 +432,6 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); - actions.register(UpdateSnapshotStatusAction.INSTANCE, TransportSnapshotUpdateStatusAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java b/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java index f7dec3e3adacc..2bad309f1cc3b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java @@ -76,22 +76,4 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); masterNodeTimeout = new TimeValue(in); } - - /** - * CAUTION: Use this method for the BWC purpose only. - * This method serializes a {@link MasterNodeRequest} as a {@link org.elasticsearch.transport.TransportRequest} - * without timeout. The master will have to use the default timeout setting. - */ - protected final void readFromAsTransportRequest(StreamInput in) throws IOException { - super.readFrom(in); - } - - /** - * CAUTION: Use this method for the BWC purpose only. - * This method deserializes a {@link MasterNodeRequest} from a {@link org.elasticsearch.transport.TransportRequest} - * without timeout. The master will have to use the default timeout setting. - */ - protected final void writeToAsTransportRequest(StreamOutput out) throws IOException { - super.writeTo(out); - } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 335fd0c8b12cd..259136ca9ccfd 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,19 +23,23 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -53,8 +57,17 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; - +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -66,12 +79,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ -public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { +public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener { + + public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; private final ClusterService clusterService; @@ -79,6 +95,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final SnapshotsService snapshotsService; + private final TransportService transportService; + private final ThreadPool threadPool; private final Lock shutdownLock = new ReentrantLock(); @@ -86,21 +104,29 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Condition shutdownCondition = shutdownLock.newCondition(); private volatile Map shardSnapshots = emptyMap(); - private final Client client; + + private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); @Inject - public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, - ThreadPool threadPool, IndicesService indicesService, Client client) { + public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, + TransportService transportService, IndicesService indicesService) { super(settings); this.indicesService = indicesService; this.snapshotsService = snapshotsService; + this.transportService = transportService; this.clusterService = clusterService; this.threadPool = threadPool; - this.client = client; if (DiscoveryNode.isDataNode(settings)) { - // this is only useful on the nodes that can hold data. - clusterService.addListener(this); + // this is only useful on the nodes that can hold data + // addLowPriorityApplier to make sure that Repository will be created before snapshot + clusterService.addLowPriorityApplier(this); } + + if (DiscoveryNode.isMasterNode(settings)) { + // This needs to run only on nodes that can become masters + transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler()); + } + } @Override @@ -125,11 +151,11 @@ protected void doStop() { @Override protected void doClose() { - clusterService.removeListener(this); + clusterService.removeApplier(this); } @Override - public void clusterChanged(ClusterChangedEvent event) { + public void applyClusterState(ClusterChangedEvent event) { try { SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); @@ -209,6 +235,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { Map> newSnapshots = new HashMap<>(); // Now go through all snapshots and update existing or create missing final String localNodeId = event.state().nodes().getLocalNodeId(); + final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); final Map> snapshotIndices = new HashMap<>(); if (snapshotsInProgress != null) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { @@ -259,12 +286,12 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { case DONE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.SUCCESS)); + new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); break; case FAILURE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure())); + new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure()), masterNode); break; default: throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); @@ -306,20 +333,20 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { public void doRun() { snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS)); + new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e); updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e))); + new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); } }); } catch (Exception e) { updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e))); + new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); } } } @@ -358,7 +385,9 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina TimeValue.timeValueMillis(snapshotStatus.time()), sb); } } - } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { + } catch (SnapshotFailedEngineException e) { + throw e; + } catch (IndexShardSnapshotFailedException e) { throw e; } catch (Exception e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); @@ -374,6 +403,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { return; } final String localNodeId = event.state().nodes().getLocalNodeId(); + final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { Map localShards = currentSnapshotShards(snapshot.snapshot()); @@ -389,12 +419,12 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS)); + new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); } else if (localShard.getValue().stage() == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure())); + new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure()), masterNode); } } @@ -415,18 +445,148 @@ private SnapshotShards(Map shards) { } } - void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - final UpdateSnapshotStatusRequest request = new UpdateSnapshotStatusRequest(snapshot, shardId, status); - client.admin().cluster().execute(UpdateSnapshotStatusAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(UpdateSnapshotStatusResponse updateSnapshotStatusResponse) { - logger.debug((Supplier) () -> new ParameterizedMessage("snapshot state is updated on the master [{}]", request)); - } - @Override - public void onFailure(Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to send/update a snapshot state [{}]", request), e); + /** + * Internal request that is used to send changes in snapshot status to master + */ + public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { + private Snapshot snapshot; + private ShardId shardId; + private ShardSnapshotStatus status; + + public UpdateIndexShardSnapshotStatusRequest() { + + } + + public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + snapshot = new Snapshot(in); + shardId = ShardId.readShardId(in); + status = new ShardSnapshotStatus(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + public Snapshot snapshot() { + return snapshot; + } + + public ShardId shardId() { + return shardId; + } + + public ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + } + + /** + * Updates the shard status + */ + public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { + UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + try { + transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); + } + } + + /** + * Updates the shard status on master node + * + * @param request update shard status request + */ + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { + logger.trace("received updated snapshot restore state [{}]", request); + clusterService.submitStateUpdateTask( + "update snapshot state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + snapshotStateExecutor, + (source, e) -> logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", + request.snapshot(), request.shardId(), request.status()), e)); + } + + class SnapshotStateExecutor implements ClusterStateTaskExecutor { + + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean updated = false; + + for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.snapshot().equals(updateSnapshotState.snapshot())) { + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); + if (updated == false) { + shards.putAll(entry.shards()); + updated = true; + } + shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + changedCount++; + } + } + + if (updated) { + if (completed(shards.values()) == false) { + entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); + } else { + // Snapshot is finished - mark it as done + // TODO: Add PARTIAL_SUCCESS status? + SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); + entries.add(updatedEntry); + // Finalize snapshot in the repository + snapshotsService.endSnapshot(updatedEntry); + logger.info("snapshot [{}] is done", updatedEntry.snapshot()); + } + } else { + entries.add(entry); + } + } + if (changedCount > 0) { + logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); + + final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); + } } - }); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } } + + /** + * Transport request handler that is used to send changes in snapshot status to master + */ + class UpdateSnapshotStateRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception { + innerUpdateSnapshotState(request); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be55009cf7e28..037db4d5caf66 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -97,7 +97,7 @@ *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method
  • - *
  • When last shard is completed master node in {@link TransportSnapshotUpdateStatusAction#innerUpdateSnapshotState} method marks the snapshot as completed
  • + *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • * diff --git a/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java b/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java deleted file mode 100644 index 2b950e2d2dfe7..0000000000000 --- a/core/src/main/java/org/elasticsearch/snapshots/TransportSnapshotUpdateStatusAction.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateTaskListener; -import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.cluster.SnapshotsInProgress.completed; - -/** - * A {@link TransportSnapshotUpdateStatusAction} receives snapshot state messages from {@link SnapshotShardsService}, - * then computes and updates the {@link ClusterState}. - */ -public class TransportSnapshotUpdateStatusAction extends TransportMasterNodeAction { - private final SnapshotUpdateStateExecutor snapshotStateExecutor; - - @Inject - public TransportSnapshotUpdateStatusAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, - ActionFilters actionFilters, SnapshotsService snapshotsService) { - super(settings, UpdateSnapshotStatusAction.NAME, transportService, clusterService, - threadPool, actionFilters, indexNameExpressionResolver, UpdateSnapshotStatusRequest::new); - this.snapshotStateExecutor = new SnapshotUpdateStateExecutor(snapshotsService, logger); - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected UpdateSnapshotStatusResponse newResponse() { - return new UpdateSnapshotStatusResponse(); - } - - @Override - protected void masterOperation(UpdateSnapshotStatusRequest request, ClusterState state, - ActionListener listener) throws Exception { - innerUpdateSnapshotState(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(UpdateSnapshotStatusRequest request, ClusterState state) { - return null; - } - - void innerUpdateSnapshotState(final UpdateSnapshotStatusRequest request, ActionListener listener) { - logger.trace((Supplier) () -> new ParameterizedMessage("received updated snapshot restore status [{}]", request)); - clusterService.submitStateUpdateTask( - "update snapshot state", - request, - ClusterStateTaskConfig.build(Priority.NORMAL), - snapshotStateExecutor, - new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage( - "unexpected failure while updating snapshot status [{}]", request), e); - try { - listener.onFailure(e); - } catch (Exception channelException) { - channelException.addSuppressed(e); - logger.warn((Supplier) () -> new ParameterizedMessage( - "failed to send failure [{}] while updating snapshot status [{}]", e, request), channelException); - } - } - - @Override - public void onNoLongerMaster(String source) { - logger.error("no longer master while updating snapshot status [{}]", request); - try { - listener.onFailure(new NotMasterException(source)); - } catch (Exception channelException) { - logger.warn((Supplier) () -> new ParameterizedMessage( - "{} failed to send no longer master updating snapshot[{}]", request.snapshot(), request), channelException); - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - try { - listener.onResponse(new UpdateSnapshotStatusResponse()); - } catch (Exception channelException) { - logger.warn((Supplier) () -> new ParameterizedMessage( - "failed to send response after updating snapshot status [{}]", request), channelException); - } - } - } - ); - } - - // The client node sends the update message to the master, then the master node updates the ClusterState. - static class SnapshotUpdateStateExecutor implements ClusterStateTaskExecutor { - private final SnapshotsService snapshotsService; - private final Logger logger; - - SnapshotUpdateStateExecutor(SnapshotsService snapshotsService, Logger logger) { - this.snapshotsService = snapshotsService; - this.logger = logger; - } - - @Override - public ClusterTasksResult execute(ClusterState currentState, - List tasks) throws Exception { - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (UpdateSnapshotStatusRequest updateSnapshotState : tasks) { - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] Updating shard [{}] with status [{}]", - updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - } - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } - } - - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, - SnapshotsInProgress.State.SUCCESS, shards.build()); - entries.add(updatedEntry); - // Finalize snapshot in the repository - snapshotsService.endSnapshot(updatedEntry); - logger.info("snapshot [{}] is done", updatedEntry.snapshot()); - } - } else { - entries.add(entry); - } - } - if (changedCount > 0) { - if (logger.isTraceEnabled()) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - } - final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress( - entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); - } - } - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - } - -} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java deleted file mode 100644 index d8253e7bed316..0000000000000 --- a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusAction.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.elasticsearch.action.Action; -import org.elasticsearch.client.ElasticsearchClient; - -public class UpdateSnapshotStatusAction extends Action { - public static final UpdateSnapshotStatusAction INSTANCE = new UpdateSnapshotStatusAction(); - public static final String NAME = "internal:cluster/snapshot/update_snapshot"; - - public UpdateSnapshotStatusAction() { - super(NAME); - } - - @Override - public UpdateSnapshotStatusRequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new UpdateSnapshotStatusRequestBuilder(client, UpdateSnapshotStatusAction.INSTANCE); - } - - @Override - public UpdateSnapshotStatusResponse newResponse() { - return new UpdateSnapshotStatusResponse(); - } -} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java deleted file mode 100644 index 4675c65f77228..0000000000000 --- a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * Internal request that is used to send changes in snapshot status to master - */ -class UpdateSnapshotStatusRequest extends MasterNodeRequest { - private Snapshot snapshot; - private ShardId shardId; - private SnapshotsInProgress.ShardSnapshotStatus status; - - UpdateSnapshotStatusRequest() { - - } - - UpdateSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - // To keep BWC, we have to deserialize a MasterNodeRequest from a TransportRequest from older versions. - if (in.getVersion().before(Version.V_7_0_0_alpha1)) { - super.readFromAsTransportRequest(in); - } else { - super.readFrom(in); - } - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = new SnapshotsInProgress.ShardSnapshotStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - // To keep BWC, we have to serialize a MasterNodeRequest as a TransportRequest for older versions. - if (out.getVersion().before(Version.V_7_0_0_alpha1)) { - super.writeToAsTransportRequest(out); - } else { - super.writeTo(out); - } - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - Snapshot snapshot() { - return snapshot; - } - - ShardId shardId() { - return shardId; - } - - SnapshotsInProgress.ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } -} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java deleted file mode 100644 index 40a40e7d510d7..0000000000000 --- a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestBuilder.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; - -class UpdateSnapshotStatusRequestBuilder extends MasterNodeOperationRequestBuilder { - UpdateSnapshotStatusRequestBuilder(ElasticsearchClient client, UpdateSnapshotStatusAction action) { - super(client, action, new UpdateSnapshotStatusRequest()); - } -} diff --git a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java b/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java deleted file mode 100644 index 62b149843e3d7..0000000000000 --- a/core/src/main/java/org/elasticsearch/snapshots/UpdateSnapshotStatusResponse.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.elasticsearch.action.ActionResponse; - -class UpdateSnapshotStatusResponse extends ActionResponse { - -} diff --git a/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java b/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java deleted file mode 100644 index 341e95037134f..0000000000000 --- a/core/src/test/java/org/elasticsearch/snapshots/UpdateSnapshotStatusRequestTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.snapshots; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; - -import java.util.UUID; - -import static org.hamcrest.Matchers.equalTo; - -public class UpdateSnapshotStatusRequestTests extends ESTestCase { - - public void testSerializeForOlderVersion() throws Exception { - UpdateSnapshotStatusRequest originalRequest = new UpdateSnapshotStatusRequest(randomSnapshot(), randomShardId(), randomStatus()); - Version bwcVersion = randomFrom(Version.V_5_1_2, Version.V_5_4_1, Version.V_6_0_0_alpha1); - final UpdateSnapshotStatusRequest cloneRequest = new UpdateSnapshotStatusRequest(); - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(bwcVersion); - originalRequest.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(bwcVersion); - cloneRequest.readFrom(in); - } - } - assertThat(cloneRequest.snapshot(), equalTo(originalRequest.snapshot())); - assertThat(cloneRequest.shardId(), equalTo(originalRequest.shardId())); - assertThat(cloneRequest.status(), equalTo(originalRequest.status())); - } - - public void testSerializeForCurrentVersion() throws Exception { - UpdateSnapshotStatusRequest originalRequest = new UpdateSnapshotStatusRequest(randomSnapshot(), randomShardId(), randomStatus()); - final UpdateSnapshotStatusRequest cloneRequest = new UpdateSnapshotStatusRequest(); - try (BytesStreamOutput out = new BytesStreamOutput()) { - originalRequest.writeTo(out); - try (StreamInput in = out.bytes().streamInput()) { - cloneRequest.readFrom(in); - } - } - assertThat(cloneRequest.snapshot(), equalTo(originalRequest.snapshot())); - assertThat(cloneRequest.shardId(), equalTo(originalRequest.shardId())); - assertThat(cloneRequest.status(), equalTo(originalRequest.status())); - } - - private ShardId randomShardId() { - return new ShardId(new Index(randomAlphaOfLength(10), UUID.randomUUID().toString()), between(0, 1000)); - } - - private Snapshot randomSnapshot() { - SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(10), UUID.randomUUID().toString()); - return new Snapshot(randomAlphaOfLength(10), snapshotId); - } - - private SnapshotsInProgress.ShardSnapshotStatus randomStatus() { - return new SnapshotsInProgress.ShardSnapshotStatus("node-" + between(0, 100), - randomFrom(SnapshotsInProgress.State.values()), randomAlphaOfLength(20)); - } -} diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index a49df4d083980..66185325931d8 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -54,11 +54,6 @@ for (Version version : wireCompatVersions) { if (project.bwc_tests_enabled) { bwcTest.dependsOn(versionBwcTest) } - - /* To support taking index snapshots, we have to set path.repo setting */ - tasks.getByName("${baseName}#mixedClusterTestRunner").configure { - systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") - } } test.enabled = false // no unit tests for rolling upgrades, only the rest integration test diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index e5fa0ccb8eb13..c6200417e39d8 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; @@ -43,9 +42,7 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; public class IndexingIT extends ESRestTestCase { @@ -240,64 +237,6 @@ public void testSeqNoCheckpoints() throws Exception { } } - /** - * Previously, we use a TransportHandler to send/receive the snapshot state messages. - * This handler does not retry if there is failure. We decided to replace it by TransportMasterNodeAction which does retry if needed. - * - * This test makes sure that these two approaches are compatible to one another. - * In particular, the master node should receive and handle the snapshot state messages from old nodes properly. - */ - public void testUpdateSnapshotStates() throws Exception { - Nodes nodes = buildNodeAndVersions(); - assertThat(nodes.getNewNodes(), not(empty())); - logger.info("cluster discovered: {}", nodes.toString()); - - // Create the repository before taking the snapshot. - String repoConfig = JsonXContent.contentBuilder() - .startObject() - .field("type", "fs") - .startObject("settings") - .field("compress", randomBoolean()) - .field("location", System.getProperty("tests.path.repo")) - .endObject() - .endObject() - .string(); - - assertOK( - client().performRequest("PUT", "/_snapshot/repo", emptyMap(), - new StringEntity(repoConfig, ContentType.APPLICATION_JSON)) - ); - - String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); - - // Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes. - Settings.Builder settings = Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10)) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) - .put("index.routing.allocation.include._name", bwcNames); - - final String index = "test-snapshot-index"; - createIndex(index, settings.build()); - indexDocs(index, 0, between(50, 100)); - ensureGreen(); - assertOK(client().performRequest("POST", index + "/_refresh")); - - assertOK( - client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"), - new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) - ); - - // Allocating shards on all nodes, taking snapshots should happen on all nodes. - updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); - assertOK(client().performRequest("POST", index + "/_refresh")); - - assertOK( - client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"), - new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) - ); - } - private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); assertOK(response); From 97c47243283c1568b027aa1f43bcd434538cf50e Mon Sep 17 00:00:00 2001 From: Nhat Date: Sun, 5 Nov 2017 23:06:03 -0500 Subject: [PATCH 3/7] Uses two handlers --- .../snapshots/SnapshotShardsService.java | 175 +++++++++++++++--- qa/mixed-cluster/build.gradle | 5 + .../elasticsearch/backwards/IndexingIT.java | 54 ++++++ 3 files changed, 213 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 259136ca9ccfd..2b97b6b4039e4 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,14 +23,24 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -85,9 +95,11 @@ * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ -public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener { +public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { + + public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; + public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; - public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; private final ClusterService clusterService; @@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private volatile Map shardSnapshots = emptyMap(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); + private UpdateSnapshotStatusAction updateSnapshotStatusHandler; @Inject public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, - TransportService transportService, IndicesService indicesService) { + TransportService transportService, IndicesService indicesService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings); this.indicesService = indicesService; this.snapshotsService = snapshotsService; @@ -118,20 +132,26 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S this.threadPool = threadPool; if (DiscoveryNode.isDataNode(settings)) { // this is only useful on the nodes that can hold data - // addLowPriorityApplier to make sure that Repository will be created before snapshot - clusterService.addLowPriorityApplier(this); + clusterService.addListener(this); } if (DiscoveryNode.isMasterNode(settings)) { // This needs to run only on nodes that can become masters - transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler()); + transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); + // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); } } @Override protected void doStart() { - + if (DiscoveryNode.isMasterNode(settings)) { + assert this.updateSnapshotStatusHandler != null; + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; + } } @Override @@ -151,11 +171,11 @@ protected void doStop() { @Override protected void doClose() { - clusterService.removeApplier(this); + clusterService.removeListener(this); } @Override - public void applyClusterState(ClusterChangedEvent event) { + public void clusterChanged(ClusterChangedEvent event) { try { SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); @@ -449,7 +469,7 @@ private SnapshotShards(Map shards) { /** * Internal request that is used to send changes in snapshot status to master */ - public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { + public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { private Snapshot snapshot; private ShardId shardId; private ShardSnapshotStatus status; @@ -464,6 +484,11 @@ public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, this.status = status; } + @Override + public ActionRequestValidationException validate() { + return null; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -502,11 +527,16 @@ public String toString() { * Updates the shard status */ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); try { - transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + } else { + UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); + transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); + } } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); } } @@ -515,15 +545,24 @@ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, S * * @param request update shard status request */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener listener) { logger.trace("received updated snapshot restore state [{}]", request); clusterService.submitStateUpdateTask( "update snapshot state", request, ClusterStateTaskConfig.build(Priority.NORMAL), snapshotStateExecutor, - (source, e) -> logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", - request.snapshot(), request.shardId(), request.status()), e)); + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); + } + }); } class SnapshotStateExecutor implements ClusterStateTaskExecutor { @@ -578,13 +617,107 @@ public ClusterTasksResult execute(Cluster } } + static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { + + } + + class UpdateSnapshotStatusAction extends TransportMasterNodeAction { + UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateIndexShardSnapshotStatusResponse newResponse() { + return new UpdateIndexShardSnapshotStatusResponse(); + } + + @Override + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener listener) throws Exception { + innerUpdateSnapshotState(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { + return null; + } + } + /** - * Transport request handler that is used to send changes in snapshot status to master + * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} */ - class UpdateSnapshotStateRequestHandler implements TransportRequestHandler { + static class UpdateSnapshotStatusRequestV6 extends TransportRequest { + private Snapshot snapshot; + private ShardId shardId; + private ShardSnapshotStatus status; + + UpdateSnapshotStatusRequestV6() { + + } + + UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + snapshot = new Snapshot(in); + shardId = ShardId.readShardId(in); + status = new ShardSnapshotStatus(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + Snapshot snapshot() { + return snapshot; + } + + ShardId shardId() { + return shardId; + } + + ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + } + + /** + * A BWC version of {@link UpdateSnapshotStatusAction} + */ + class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler { @Override - public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception { - innerUpdateSnapshotState(request); + public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { + final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); + innerUpdateSnapshotState(request, new ActionListener() { + @Override + public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { + + } + + @Override + public void onFailure(Exception e) { + logger.warn("Failed to update snapshot status", e); + } + }); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 781a69684e5d4..a3cc3e6ac6b2b 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -54,6 +54,11 @@ for (Version version : wireCompatVersions) { if (project.bwc_tests_enabled) { bwcTest.dependsOn(versionBwcTest) } + + /* To support taking index snapshots, we have to set path.repo setting */ + tasks.getByName("${baseName}#mixedClusterTestRunner").configure { + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") + } } test.enabled = false // no unit tests for rolling upgrades, only the rest integration test diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index f744b3029b125..9de8954c531ff 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; @@ -42,7 +43,9 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class IndexingIT extends ESRestTestCase { @@ -237,6 +240,57 @@ public void testSeqNoCheckpoints() throws Exception { } } + public void testUpdateSnapshotStatus() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assertThat(nodes.getNewNodes(), not(empty())); + logger.info("cluster discovered: {}", nodes.toString()); + + // Create the repository before taking the snapshot. + String repoConfig = JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .endObject() + .endObject() + .string(); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo", emptyMap(), + new StringEntity(repoConfig, ContentType.APPLICATION_JSON)) + ); + + String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + + // Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes. + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._name", bwcNames); + + final String index = "test-snapshot-index"; + createIndex(index, settings.build()); + indexDocs(index, 0, between(50, 100)); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + + // Allocating shards on all nodes, taking snapshots should happen on all nodes. + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); assertOK(response); From 06292d14fdc6999aab88ab4d26be8a0bacd90a83 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 14 Nov 2017 15:25:50 -0500 Subject: [PATCH 4/7] add a disruption test --- .../snapshots/SnapshotShardsServiceIT.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java new file mode 100644 index 0000000000000..c5c5904c42465 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockRepository.Plugin.class, MockTransportService.TestPlugin.class); + } + + public void testRetryPostingSnapshotStatusMessages() throws Exception { + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + final int shards = between(1, 10); + assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0))); + ensureGreen(); + final int numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + + logger.info("--> blocking repository"); + String blockedNode = blockNodeWithIndex("test-repo", "test-index"); + dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(false) + .setIndices("test-index") + .get(); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + + logger.info("--> start disrupting cluster"); + final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode), + NetworkDisruption.NetworkDelay.random(random())); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); + + logger.info("--> unblocking repository"); + unblockNode("test-repo", blockedNode); + Thread.sleep(200); + logger.info("--> stop disrupting cluster"); + internalCluster().clearDisruptionScheme(true); + + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + .prepareGetSnapshots("test-repo") + .setSnapshots("test-snap").get(); + logger.info("Status size [{}]", snapshotsStatusResponse.getSnapshots().get(0).successfulShards()); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), equalTo(shards)); + }, 10, TimeUnit.SECONDS); + } +} From ea7ec3889fc31d83975fdcf2c05d4ecc986b2892 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 14 Nov 2017 15:27:52 -0500 Subject: [PATCH 5/7] post update messages to the local node --- .../snapshots/SnapshotShardsService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 2b97b6b4039e4..a0dd9b4fc8d5b 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -135,22 +135,23 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S clusterService.addListener(this); } + // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); + if (DiscoveryNode.isMasterNode(settings)) { // This needs to run only on nodes that can become masters transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); - // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, - transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); } } @Override protected void doStart() { + assert this.updateSnapshotStatusHandler != null; + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; if (DiscoveryNode.isMasterNode(settings)) { - assert this.updateSnapshotStatusHandler != null; assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; - assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; } } @@ -530,7 +531,7 @@ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, S try { if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); - transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } else { UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); From cb73eea012ce96cdb755b48567e768b4e8543e80 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 14 Nov 2017 15:40:27 -0500 Subject: [PATCH 6/7] try posting status forever --- .../java/org/elasticsearch/snapshots/SnapshotShardsService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index a0dd9b4fc8d5b..f8a601cc41fd2 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -483,6 +483,8 @@ public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, this.snapshot = snapshot; this.shardId = shardId; this.status = status; + // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. + this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); } @Override From 85da45d788774686a3d9f90575fbbfe05f33205c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 15 Nov 2017 10:02:41 -0500 Subject: [PATCH 7/7] do not use sleep --- .../snapshots/SnapshotShardsServiceIT.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index c5c5904c42465..651cd96776e75 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; @@ -31,10 +32,14 @@ import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase { @@ -71,6 +76,9 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { .get(); waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap") + .get().getSnapshots().get(0).snapshotId(); + logger.info("--> start disrupting cluster"); final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode), NetworkDisruption.NetworkDelay.random(random())); @@ -79,16 +87,27 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { logger.info("--> unblocking repository"); unblockNode("test-repo", blockedNode); - Thread.sleep(200); + + // Retrieve snapshot status from the data node. + SnapshotShardsService snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, blockedNode); + assertBusy(() -> { + final Snapshot snapshot = new Snapshot("test-repo", snapshotId); + List stages = snapshotShardsService.currentSnapshotShards(snapshot) + .values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList()); + assertThat(stages, hasSize(shards)); + assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE))); + }); + logger.info("--> stop disrupting cluster"); + networkDisruption.stopDisrupting(); internalCluster().clearDisruptionScheme(true); assertBusy(() -> { GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() .prepareGetSnapshots("test-repo") .setSnapshots("test-snap").get(); - logger.info("Status size [{}]", snapshotsStatusResponse.getSnapshots().get(0).successfulShards()); SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards()); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.successfulShards(), equalTo(shards)); }, 10, TimeUnit.SECONDS);