Skip to content

Commit 38e9522

Browse files
committed
Remove wait for cluster state step in peer recovery (#40004)
We introduced WAIT_CLUSTERSTATE action in #19287 (5.0), but then stopped using it since #25692 (6.0). This change removes that action and related code in 7.x and 8.0. Relates #19287 Relates #25692
1 parent 9ba0bdf commit 38e9522

File tree

16 files changed

+37
-151
lines changed

16 files changed

+37
-151
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.ExceptionsHelper;
3232
import org.elasticsearch.action.ActionListener;
3333
import org.elasticsearch.action.support.ChannelActionListener;
34-
import org.elasticsearch.action.support.PlainActionFuture;
3534
import org.elasticsearch.cluster.ClusterState;
3635
import org.elasticsearch.cluster.ClusterStateObserver;
3736
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -57,7 +56,6 @@
5756
import org.elasticsearch.index.translog.Translog;
5857
import org.elasticsearch.index.translog.TranslogCorruptedException;
5958
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
60-
import org.elasticsearch.node.NodeClosedException;
6159
import org.elasticsearch.tasks.Task;
6260
import org.elasticsearch.threadpool.ThreadPool;
6361
import org.elasticsearch.transport.ConnectTransportException;
@@ -93,7 +91,6 @@ public static class Actions {
9391
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
9492
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
9593
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
96-
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
9794
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
9895
}
9996

@@ -112,7 +109,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
112109
this.transportService = transportService;
113110
this.recoverySettings = recoverySettings;
114111
this.clusterService = clusterService;
115-
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState);
112+
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
116113

117114
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
118115
FilesInfoRequestHandler());
@@ -126,8 +123,6 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
126123
new TranslogOperationsRequestHandler());
127124
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
128125
FinalizeRecoveryRequestHandler());
129-
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
130-
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
131126
transportService.registerRequestHandler(
132127
Actions.HANDOFF_PRIMARY_CONTEXT,
133128
RecoveryHandoffPrimaryContextRequest::new,
@@ -452,18 +447,6 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh
452447
}
453448
}
454449

455-
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
456-
457-
@Override
458-
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
459-
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
460-
)) {
461-
recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion());
462-
}
463-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
464-
}
465-
}
466-
467450
class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
468451

469452
@Override
@@ -538,46 +521,6 @@ public void onTimeout(TimeValue timeout) {
538521
}
539522
}
540523

541-
private void waitForClusterState(long clusterStateVersion) {
542-
final ClusterState clusterState = clusterService.state();
543-
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, TimeValue.timeValueMinutes(5), logger,
544-
threadPool.getThreadContext());
545-
if (clusterState.getVersion() >= clusterStateVersion) {
546-
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
547-
clusterState.getVersion());
548-
return;
549-
} else {
550-
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
551-
final PlainActionFuture<Long> future = new PlainActionFuture<>();
552-
observer.waitForNextChange(new ClusterStateObserver.Listener() {
553-
554-
@Override
555-
public void onNewClusterState(ClusterState state) {
556-
future.onResponse(state.getVersion());
557-
}
558-
559-
@Override
560-
public void onClusterServiceClose() {
561-
future.onFailure(new NodeClosedException(clusterService.localNode()));
562-
}
563-
564-
@Override
565-
public void onTimeout(TimeValue timeout) {
566-
future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion));
567-
}
568-
}, newState -> newState.getVersion() >= clusterStateVersion);
569-
try {
570-
long currentVersion = future.get();
571-
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion, currentVersion);
572-
} catch (Exception e) {
573-
logger.debug(() -> new ParameterizedMessage(
574-
"failed waiting for cluster state with version {} (current: {})",
575-
clusterStateVersion, clusterService.state().getVersion()), e);
576-
throw ExceptionsHelper.convertToRuntime(e);
577-
}
578-
}
579-
}
580-
581524
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
582525

583526
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.List;
3737
import java.util.concurrent.ConcurrentMap;
3838
import java.util.concurrent.atomic.AtomicBoolean;
39-
import java.util.function.LongConsumer;
4039

4140
/**
4241
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
@@ -51,12 +50,10 @@ public class RecoveriesCollection {
5150

5251
private final Logger logger;
5352
private final ThreadPool threadPool;
54-
private final LongConsumer ensureClusterStateVersionCallback;
5553

56-
public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer ensureClusterStateVersionCallback) {
54+
public RecoveriesCollection(Logger logger, ThreadPool threadPool) {
5755
this.logger = logger;
5856
this.threadPool = threadPool;
59-
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
6057
}
6158

6259
/**
@@ -66,7 +63,7 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer e
6663
*/
6764
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
6865
PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
69-
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
66+
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
7067
startRecoveryInternal(recoveryTarget, activityTimeout);
7168
return recoveryTarget.recoveryId();
7269
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.util.concurrent.CountDownLatch;
5555
import java.util.concurrent.atomic.AtomicBoolean;
5656
import java.util.concurrent.atomic.AtomicLong;
57-
import java.util.function.LongConsumer;
5857

5958
/**
6059
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@@ -75,7 +74,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
7574
private final MultiFileWriter multiFileWriter;
7675
private final Store store;
7776
private final PeerRecoveryTargetService.RecoveryListener listener;
78-
private final LongConsumer ensureClusterStateVersionCallback;
7977

8078
private final AtomicBoolean finished = new AtomicBoolean();
8179

@@ -93,14 +91,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
9391
* @param indexShard local shard where we want to recover to
9492
* @param sourceNode source node of the recovery where we recover from
9593
* @param listener called when recovery is completed/failed
96-
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
97-
* version; necessary for primary relocation so that new primary knows about all other ongoing
98-
* replica recoveries when replicating documents (see {@link RecoverySourceHandler})
9994
*/
100-
public RecoveryTarget(final IndexShard indexShard,
101-
final DiscoveryNode sourceNode,
102-
final PeerRecoveryTargetService.RecoveryListener listener,
103-
final LongConsumer ensureClusterStateVersionCallback) {
95+
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) {
10496
super("recovery_status");
10597
this.cancellableThreads = new CancellableThreads();
10698
this.recoveryId = idGenerator.incrementAndGet();
@@ -113,7 +105,6 @@ public RecoveryTarget(final IndexShard indexShard,
113105
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
114106
this::ensureRefCount);
115107
this.store = indexShard.store();
116-
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
117108
// make sure the store is not released until we are done.
118109
store.incRef();
119110
indexShard.recoveryStats().incCurrentAsTarget();
@@ -125,7 +116,7 @@ public RecoveryTarget(final IndexShard indexShard,
125116
* @return a copy of this recovery target
126117
*/
127118
public RecoveryTarget retryCopy() {
128-
return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
119+
return new RecoveryTarget(indexShard, sourceNode, listener);
129120
}
130121

131122
public long recoveryId() {
@@ -314,11 +305,6 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> l
314305
});
315306
}
316307

317-
@Override
318-
public void ensureClusterStateVersion(long clusterStateVersion) {
319-
ensureClusterStateVersionCallback.accept(clusterStateVersion);
320-
}
321-
322308
@Override
323309
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
324310
indexShard.activateWithPrimaryContext(primaryContext);

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,6 @@ public interface RecoveryTargetHandler {
4848
*/
4949
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
5050

51-
/**
52-
* Blockingly waits for cluster state with at least clusterStateVersion to be available
53-
*/
54-
void ensureClusterStateVersion(long clusterStateVersion);
55-
5651
/**
5752
* Handoff the primary context between the relocation source and the relocation target.
5853
*

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,6 @@ public void finalizeRecovery(final long globalCheckpoint, final ActionListener<V
9595
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
9696
}
9797

98-
@Override
99-
public void ensureClusterStateVersion(long clusterStateVersion) {
100-
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
101-
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
102-
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
103-
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
104-
}
105-
10698
@Override
10799
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
108100
transportService.submitRequest(

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,20 +120,19 @@ public void run() {
120120
};
121121
thread.start();
122122
IndexShard replica = shards.addReplica();
123-
Future<Void> future = shards.asyncRecoverReplica(replica, (indexShard, node)
124-
-> new RecoveryTarget(indexShard, node, recoveryListener, version -> {
125-
}) {
126-
@Override
127-
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
128-
super.cleanFiles(totalTranslogOps, sourceMetaData);
129-
latch.countDown();
130-
try {
131-
latch.await();
132-
} catch (InterruptedException e) {
133-
throw new AssertionError(e);
123+
Future<Void> future = shards.asyncRecoverReplica(replica,
124+
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
125+
@Override
126+
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
127+
super.cleanFiles(totalTranslogOps, sourceMetaData);
128+
latch.countDown();
129+
try {
130+
latch.await();
131+
} catch (InterruptedException e) {
132+
throw new AssertionError(e);
133+
}
134134
}
135-
}
136-
});
135+
});
137136
future.get();
138137
thread.join();
139138
shards.assertAllEqual(numDocs);
@@ -197,7 +196,7 @@ public IndexResult index(Index op) throws IOException {
197196
thread.start();
198197
IndexShard replica = shards.addReplica();
199198
Future<Void> fut = shards.asyncRecoverReplica(replica,
200-
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
199+
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
201200
@Override
202201
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
203202
ActionListener<Void> listener) {

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
491491
AtomicBoolean recoveryDone = new AtomicBoolean(false);
492492
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
493493
recoveryStart.countDown();
494-
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
494+
return new RecoveryTarget(indexShard, node, recoveryListener) {
495495
@Override
496496
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
497497
recoveryDone.set(true);
@@ -556,7 +556,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) {
556556
final IndexShard replica = shards.addReplica();
557557
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
558558
replica,
559-
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
559+
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
560560
@Override
561561
public void indexTranslogOperations(
562562
final List<Translog.Operation> operations,
@@ -811,7 +811,7 @@ public static class BlockingTarget extends RecoveryTarget {
811811
public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery,
812812
IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
813813
Logger logger) {
814-
super(shard, sourceNode, listener, version -> {});
814+
super(shard, sourceNode, listener);
815815
this.recoveryBlocked = recoveryBlocked;
816816
this.releaseRecovery = releaseRecovery;
817817
this.stageToBlock = stageToBlock;

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,8 +2427,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException {
24272427
indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}");
24282428
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
24292429
recoverReplica(replica, primary, (shard, discoveryNode) ->
2430-
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
2431-
}) {
2430+
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
24322431
@Override
24332432
public void indexTranslogOperations(
24342433
final List<Translog.Operation> operations,
@@ -2550,8 +2549,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException {
25502549
// Shard is still inactive since we haven't started recovering yet
25512550
assertFalse(replica.isActive());
25522551
recoverReplica(replica, primary, (shard, discoveryNode) ->
2553-
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
2554-
}) {
2552+
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
25552553
@Override
25562554
public void indexTranslogOperations(
25572555
final List<Translog.Operation> operations,
@@ -2605,8 +2603,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
26052603
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
26062604
assertListenerCalled.accept(replica);
26072605
recoverReplica(replica, primary, (shard, discoveryNode) ->
2608-
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
2609-
}) {
2606+
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
26102607
// we're only checking that listeners are called when the engine is open, before there is no point
26112608
@Override
26122609
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {

0 commit comments

Comments
 (0)