Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -85,9 +95,11 @@
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
* starting and stopping shard level snapshots
*/
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener {
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot";
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";

public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot";

private final ClusterService clusterService;

Expand All @@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap();

private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private UpdateSnapshotStatusAction updateSnapshotStatusHandler;

@Inject
public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool,
TransportService transportService, IndicesService indicesService) {
TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings);
this.indicesService = indicesService;
this.snapshotsService = snapshotsService;
Expand All @@ -118,20 +132,26 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S
this.threadPool = threadPool;
if (DiscoveryNode.isDataNode(settings)) {
// this is only useful on the nodes that can hold data
// addLowPriorityApplier to make sure that Repository will be created before snapshot
clusterService.addLowPriorityApplier(this);
clusterService.addListener(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here why this is needs to be a listener and not an applier and why it's ok to have this as addListener and not addLast?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test DedicatedClusterSnapshotRestoreIT#testSnapshotWithStuckNode was failed with an Applier but passed with a Listener. However, I don't really know the root cause.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imotov, I think I have figured out the root cause. A Listener is called after the new state is set, while an Applier is called before that. In SnapshotShardsService, we call #syncShardStatsOnNewMaster -> #updateIndexShardSnapshotStatus which in turn uses a TransportMasterNodeAction. The TransportMasterNodeAction accesses the state of the cluster directly which may not be available yet if it's invoked from the Applier.

Caused by: java.lang.AssertionError: should not be called by a cluster state applier. reason [the applied cluster state is not yet available]

This explains why #testSnapshotWithStuckNode was failed with an Applier but passed with a Listener.

}

if (DiscoveryNode.isMasterNode(settings)) {
// This needs to run only on nodes that can become masters
transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler());
transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6());
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
}

}

@Override
protected void doStart() {

if (DiscoveryNode.isMasterNode(settings)) {
assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null;
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
}
}

@Override
Expand All @@ -151,11 +171,11 @@ protected void doStop() {

@Override
protected void doClose() {
clusterService.removeApplier(this);
clusterService.removeListener(this);
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
public void clusterChanged(ClusterChangedEvent event) {
try {
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
Expand Down Expand Up @@ -449,7 +469,7 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
/**
* Internal request that is used to send changes in snapshot status to master
*/
public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
private Snapshot snapshot;
private ShardId shardId;
private ShardSnapshotStatus status;
Expand All @@ -464,6 +484,11 @@ public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId,
this.status = status;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down Expand Up @@ -502,11 +527,16 @@ public String toString() {
* Updates the shard status
*/
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
try {
transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this request has a default masterNodeTimeout of 30 seconds. Shouldn't we wait longer (possibly forever?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed cb73eea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you send the request like this, and the master is unavailable, there will be no retry (i.e. the retry code in TransportMasterNodeAction won't come into play). The action needs to be sent to to the node itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed ea7ec38

} else {
UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status);
transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME);
}
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
}
}

Expand All @@ -515,15 +545,24 @@ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, S
*
* @param request update shard status request
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) {
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
(source, e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]",
request.snapshot(), request.shardId(), request.status()), e));
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
}
});
}

class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
Expand Down Expand Up @@ -578,13 +617,107 @@ public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest> execute(Cluster
}
}

static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {

}

class UpdateSnapshotStatusAction extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected UpdateIndexShardSnapshotStatusResponse newResponse() {
return new UpdateIndexShardSnapshotStatusResponse();
}

@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
innerUpdateSnapshotState(request, listener);
}

@Override
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
return null;
}
}

/**
* Transport request handler that is used to send changes in snapshot status to master
* A BWC version of {@link UpdateIndexShardSnapshotStatusRequest}
*/
class UpdateSnapshotStateRequestHandler implements TransportRequestHandler<UpdateIndexShardSnapshotStatusRequest> {
static class UpdateSnapshotStatusRequestV6 extends TransportRequest {
private Snapshot snapshot;
private ShardId shardId;
private ShardSnapshotStatus status;

UpdateSnapshotStatusRequestV6() {

}

UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshot = new Snapshot(in);
shardId = ShardId.readShardId(in);
status = new ShardSnapshotStatus(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshot.writeTo(out);
shardId.writeTo(out);
status.writeTo(out);
}

Snapshot snapshot() {
return snapshot;
}

ShardId shardId() {
return shardId;
}

ShardSnapshotStatus status() {
return status;
}

@Override
public String toString() {
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
}
}

/**
* A BWC version of {@link UpdateSnapshotStatusAction}
*/
class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> {
@Override
public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception {
innerUpdateSnapshotState(request);
public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception {
final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, masterNodeTimeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() {
@Override
public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) {

}

@Override
public void onFailure(Exception e) {
logger.warn("Failed to update snapshot status", e);
}
});
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
Expand Down
5 changes: 5 additions & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ for (Version version : wireCompatVersions) {
if (project.bwc_tests_enabled) {
bwcTest.dependsOn(versionBwcTest)
}

/* To support taking index snapshots, we have to set path.repo setting */
tasks.getByName("${baseName}#mixedClusterTestRunner").configure {
systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo")
}
}

test.enabled = false // no unit tests for rolling upgrades, only the rest integration test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
Expand All @@ -42,7 +43,9 @@
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class IndexingIT extends ESRestTestCase {

Expand Down Expand Up @@ -237,6 +240,57 @@ public void testSeqNoCheckpoints() throws Exception {
}
}

public void testUpdateSnapshotStatus() throws Exception {
Nodes nodes = buildNodeAndVersions();
assertThat(nodes.getNewNodes(), not(empty()));
logger.info("cluster discovered: {}", nodes.toString());

// Create the repository before taking the snapshot.
String repoConfig = JsonXContent.contentBuilder()
.startObject()
.field("type", "fs")
.startObject("settings")
.field("compress", randomBoolean())
.field("location", System.getProperty("tests.path.repo"))
.endObject()
.endObject()
.string();

assertOK(
client().performRequest("PUT", "/_snapshot/repo", emptyMap(),
new StringEntity(repoConfig, ContentType.APPLICATION_JSON))
);

String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));

// Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes.
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put("index.routing.allocation.include._name", bwcNames);

final String index = "test-snapshot-index";
createIndex(index, settings.build());
indexDocs(index, 0, between(50, 100));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));

assertOK(
client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"),
new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
);

// Allocating shards on all nodes, taking snapshots should happen on all nodes.
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));

assertOK(
client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"),
new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
);
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
assertOK(response);
Expand Down