|
23 | 23 | import org.apache.logging.log4j.message.ParameterizedMessage; |
24 | 24 | import org.apache.logging.log4j.util.Supplier; |
25 | 25 | import org.elasticsearch.ExceptionsHelper; |
26 | | -import org.elasticsearch.Version; |
27 | 26 | import org.elasticsearch.action.ActionListener; |
28 | 27 | import org.elasticsearch.action.ActionRequestValidationException; |
29 | 28 | import org.elasticsearch.action.ActionResponse; |
|
68 | 67 | import org.elasticsearch.repositories.Repository; |
69 | 68 | import org.elasticsearch.threadpool.ThreadPool; |
70 | 69 | import org.elasticsearch.transport.EmptyTransportResponseHandler; |
71 | | -import org.elasticsearch.transport.TransportChannel; |
72 | | -import org.elasticsearch.transport.TransportRequest; |
73 | | -import org.elasticsearch.transport.TransportRequestHandler; |
74 | | -import org.elasticsearch.transport.TransportResponse; |
75 | 70 | import org.elasticsearch.transport.TransportService; |
76 | 71 |
|
77 | 72 | import java.io.IOException; |
|
97 | 92 | */ |
98 | 93 | public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { |
99 | 94 |
|
100 | | - public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; |
101 | 95 | public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; |
102 | 96 |
|
103 | | - |
104 | 97 | private final ClusterService clusterService; |
105 | 98 |
|
106 | 99 | private final IndicesService indicesService; |
@@ -138,21 +131,12 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S |
138 | 131 | // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. |
139 | 132 | this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, |
140 | 133 | transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); |
141 | | - |
142 | | - if (DiscoveryNode.isMasterNode(settings)) { |
143 | | - // This needs to run only on nodes that can become masters |
144 | | - transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); |
145 | | - } |
146 | | - |
147 | 134 | } |
148 | 135 |
|
149 | 136 | @Override |
150 | 137 | protected void doStart() { |
151 | 138 | assert this.updateSnapshotStatusHandler != null; |
152 | 139 | assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; |
153 | | - if (DiscoveryNode.isMasterNode(settings)) { |
154 | | - assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; |
155 | | - } |
156 | 140 | } |
157 | 141 |
|
158 | 142 | @Override |
@@ -531,13 +515,8 @@ public String toString() { |
531 | 515 | */ |
532 | 516 | public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { |
533 | 517 | try { |
534 | | - if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { |
535 | | - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); |
536 | | - transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); |
537 | | - } else { |
538 | | - UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); |
539 | | - transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); |
540 | | - } |
| 518 | + UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); |
| 519 | + transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); |
541 | 520 | } catch (Exception e) { |
542 | 521 | logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); |
543 | 522 | } |
@@ -651,78 +630,4 @@ protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest |
651 | 630 | } |
652 | 631 | } |
653 | 632 |
|
654 | | - /** |
655 | | - * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} |
656 | | - */ |
657 | | - static class UpdateSnapshotStatusRequestV6 extends TransportRequest { |
658 | | - private Snapshot snapshot; |
659 | | - private ShardId shardId; |
660 | | - private ShardSnapshotStatus status; |
661 | | - |
662 | | - UpdateSnapshotStatusRequestV6() { |
663 | | - |
664 | | - } |
665 | | - |
666 | | - UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { |
667 | | - this.snapshot = snapshot; |
668 | | - this.shardId = shardId; |
669 | | - this.status = status; |
670 | | - } |
671 | | - |
672 | | - @Override |
673 | | - public void readFrom(StreamInput in) throws IOException { |
674 | | - super.readFrom(in); |
675 | | - snapshot = new Snapshot(in); |
676 | | - shardId = ShardId.readShardId(in); |
677 | | - status = new ShardSnapshotStatus(in); |
678 | | - } |
679 | | - |
680 | | - @Override |
681 | | - public void writeTo(StreamOutput out) throws IOException { |
682 | | - super.writeTo(out); |
683 | | - snapshot.writeTo(out); |
684 | | - shardId.writeTo(out); |
685 | | - status.writeTo(out); |
686 | | - } |
687 | | - |
688 | | - Snapshot snapshot() { |
689 | | - return snapshot; |
690 | | - } |
691 | | - |
692 | | - ShardId shardId() { |
693 | | - return shardId; |
694 | | - } |
695 | | - |
696 | | - ShardSnapshotStatus status() { |
697 | | - return status; |
698 | | - } |
699 | | - |
700 | | - @Override |
701 | | - public String toString() { |
702 | | - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; |
703 | | - } |
704 | | - } |
705 | | - |
706 | | - /** |
707 | | - * A BWC version of {@link UpdateSnapshotStatusAction} |
708 | | - */ |
709 | | - class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> { |
710 | | - @Override |
711 | | - public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { |
712 | | - final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); |
713 | | - innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() { |
714 | | - @Override |
715 | | - public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { |
716 | | - |
717 | | - } |
718 | | - |
719 | | - @Override |
720 | | - public void onFailure(Exception e) { |
721 | | - logger.warn("Failed to update snapshot status", e); |
722 | | - } |
723 | | - }); |
724 | | - channel.sendResponse(TransportResponse.Empty.INSTANCE); |
725 | | - } |
726 | | - } |
727 | | - |
728 | 633 | } |
0 commit comments