Skip to content
6 changes: 3 additions & 3 deletions core/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
Expand All @@ -199,7 +198,6 @@
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -311,6 +309,8 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.snapshots.TransportSnapshotUpdateStatusAction;
import org.elasticsearch.snapshots.UpdateSnapshotStatusAction;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand All @@ -324,7 +324,6 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;

/**
Expand Down Expand Up @@ -432,6 +431,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
actions.register(UpdateSnapshotStatusAction.INSTANCE, TransportSnapshotUpdateStatusAction.class);

actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,22 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
masterNodeTimeout = new TimeValue(in);
}

/**
* CAUTION: Use this method for the BWC purpose only.
* This method serializes a {@link MasterNodeRequest} as a {@link org.elasticsearch.transport.TransportRequest}
* without timeout. The master will have to use the default timeout setting.
*/
protected final void readFromAsTransportRequest(StreamInput in) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever, but at the same time might be trap-y. This class is used in a lot of places and I think I would rather have 2 different clean implementation in 6.x and one clean implementation in 7.x then this here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, having two handlers may be safer than this approach.

super.readFrom(in);
}

/**
* CAUTION: Use this method for the BWC purpose only.
* This method deserializes a {@link MasterNodeRequest} from a {@link org.elasticsearch.transport.TransportRequest}
* without timeout. The master will have to use the default timeout setting.
*/
protected final void writeToAsTransportRequest(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method</li>
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method</li>
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed</li>
* <li>When last shard is completed master node in {@link TransportSnapshotUpdateStatusAction#innerUpdateSnapshotState} method marks the snapshot as completed</li>
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.snapshots;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.cluster.SnapshotsInProgress.completed;

/**
* A {@link TransportSnapshotUpdateStatusAction} receives snapshot state messages from {@link SnapshotShardsService},
* then computes and updates the {@link ClusterState}.
*/
public class TransportSnapshotUpdateStatusAction extends TransportMasterNodeAction<UpdateSnapshotStatusRequest,
UpdateSnapshotStatusResponse> {
private final SnapshotUpdateStateExecutor snapshotStateExecutor;

@Inject
public TransportSnapshotUpdateStatusAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
ActionFilters actionFilters, SnapshotsService snapshotsService) {
super(settings, UpdateSnapshotStatusAction.NAME, transportService, clusterService,
threadPool, actionFilters, indexNameExpressionResolver, UpdateSnapshotStatusRequest::new);
this.snapshotStateExecutor = new SnapshotUpdateStateExecutor(snapshotsService, logger);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected UpdateSnapshotStatusResponse newResponse() {
return new UpdateSnapshotStatusResponse();
}

@Override
protected void masterOperation(UpdateSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateSnapshotStatusResponse> listener) throws Exception {
innerUpdateSnapshotState(request, listener);
}

@Override
protected ClusterBlockException checkBlock(UpdateSnapshotStatusRequest request, ClusterState state) {
return null;
}

void innerUpdateSnapshotState(final UpdateSnapshotStatusRequest request, ActionListener<UpdateSnapshotStatusResponse> listener) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("received updated snapshot restore status [{}]", request));
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"unexpected failure while updating snapshot status [{}]", request), e);
try {
listener.onFailure(e);
} catch (Exception channelException) {
channelException.addSuppressed(e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"failed to send failure [{}] while updating snapshot status [{}]", e, request), channelException);
}
}

@Override
public void onNoLongerMaster(String source) {
logger.error("no longer master while updating snapshot status [{}]", request);
try {
listener.onFailure(new NotMasterException(source));
} catch (Exception channelException) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"{} failed to send no longer master updating snapshot[{}]", request.snapshot(), request), channelException);
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
listener.onResponse(new UpdateSnapshotStatusResponse());
} catch (Exception channelException) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"failed to send response after updating snapshot status [{}]", request), channelException);
}
}
}
);
}

// The client node sends the update message to the master, then the master node updates the ClusterState.
static class SnapshotUpdateStateExecutor implements ClusterStateTaskExecutor<UpdateSnapshotStatusRequest> {
private final SnapshotsService snapshotsService;
private final Logger logger;

SnapshotUpdateStateExecutor(SnapshotsService snapshotsService, Logger logger) {
this.snapshotsService = snapshotsService;
this.logger = logger;
}

@Override
public ClusterTasksResult<UpdateSnapshotStatusRequest> execute(ClusterState currentState,
List<UpdateSnapshotStatusRequest> tasks) throws Exception {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;

for (UpdateSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] Updating shard [{}] with status [{}]",
updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
}
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
}
}

if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry,
SnapshotsInProgress.State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
logger.info("snapshot [{}] is done", updatedEntry.snapshot());
}
} else {
entries.add(entry);
}
}
if (changedCount > 0) {
if (logger.isTraceEnabled()) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
}
final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(
entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterTasksResult.<UpdateSnapshotStatusRequest>builder().successes(tasks).build(
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build());
}
}
return ClusterTasksResult.<UpdateSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.snapshots;

import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

public class UpdateSnapshotStatusAction extends Action<UpdateSnapshotStatusRequest, UpdateSnapshotStatusResponse,
UpdateSnapshotStatusRequestBuilder> {
public static final UpdateSnapshotStatusAction INSTANCE = new UpdateSnapshotStatusAction();
public static final String NAME = "internal:cluster/snapshot/update_snapshot";

public UpdateSnapshotStatusAction() {
super(NAME);
}

@Override
public UpdateSnapshotStatusRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new UpdateSnapshotStatusRequestBuilder(client, UpdateSnapshotStatusAction.INSTANCE);
}

@Override
public UpdateSnapshotStatusResponse newResponse() {
return new UpdateSnapshotStatusResponse();
}
}
Loading