diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index fc8e0c765b5a6..9bb29a14cadcb 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -31,7 +31,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -57,7 +56,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -93,7 +91,6 @@ public static class Actions { public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops"; public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; - public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context"; } @@ -112,7 +109,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo this.transportService = transportService; this.recoverySettings = recoverySettings; this.clusterService = clusterService; - this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState); + this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool); transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new FilesInfoRequestHandler()); @@ -126,8 +123,6 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new FinalizeRecoveryRequestHandler()); - transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new, - ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler()); transportService.registerRequestHandler( Actions.HANDOFF_PRIMARY_CONTEXT, RecoveryHandoffPrimaryContextRequest::new, @@ -452,18 +447,6 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh } } - class WaitForClusterStateRequestHandler implements TransportRequestHandler { - - @Override - public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion()); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - class HandoffPrimaryContextRequestHandler implements TransportRequestHandler { @Override @@ -538,46 +521,6 @@ public void onTimeout(TimeValue timeout) { } } - private void waitForClusterState(long clusterStateVersion) { - final ClusterState clusterState = clusterService.state(); - ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, TimeValue.timeValueMinutes(5), logger, - threadPool.getThreadContext()); - if (clusterState.getVersion() >= clusterStateVersion) { - logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion, - clusterState.getVersion()); - return; - } else { - logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion()); - final PlainActionFuture future = new PlainActionFuture<>(); - observer.waitForNextChange(new ClusterStateObserver.Listener() { - - @Override - public void onNewClusterState(ClusterState state) { - future.onResponse(state.getVersion()); - } - - @Override - public void onClusterServiceClose() { - future.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion)); - } - }, newState -> newState.getVersion() >= clusterStateVersion); - try { - long currentVersion = future.get(); - logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion, currentVersion); - } catch (Exception e) { - logger.debug(() -> new ParameterizedMessage( - "failed waiting for cluster state with version {} (current: {})", - clusterStateVersion, clusterService.state().getVersion()), e); - throw ExceptionsHelper.convertToRuntime(e); - } - } - } - class FilesInfoRequestHandler implements TransportRequestHandler { @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index d08d7b6d0bd98..76b3527073fd3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; /** * This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node @@ -51,12 +50,10 @@ public class RecoveriesCollection { private final Logger logger; private final ThreadPool threadPool; - private final LongConsumer ensureClusterStateVersionCallback; - public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer ensureClusterStateVersionCallback) { + public RecoveriesCollection(Logger logger, ThreadPool threadPool) { this.logger = logger; this.threadPool = threadPool; - this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; } /** @@ -66,7 +63,7 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer e */ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) { - RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback); + RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener); startRecoveryInternal(recoveryTarget, activityTimeout); return recoveryTarget.recoveryId(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b75cb23e9e656..aa119fcd12b4d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -54,7 +54,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongConsumer; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of @@ -75,7 +74,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final MultiFileWriter multiFileWriter; private final Store store; private final PeerRecoveryTargetService.RecoveryListener listener; - private final LongConsumer ensureClusterStateVersionCallback; private final AtomicBoolean finished = new AtomicBoolean(); @@ -93,14 +91,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget * @param indexShard local shard where we want to recover to * @param sourceNode source node of the recovery where we recover from * @param listener called when recovery is completed/failed - * @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided - * version; necessary for primary relocation so that new primary knows about all other ongoing - * replica recoveries when replicating documents (see {@link RecoverySourceHandler}) */ - public RecoveryTarget(final IndexShard indexShard, - final DiscoveryNode sourceNode, - final PeerRecoveryTargetService.RecoveryListener listener, - final LongConsumer ensureClusterStateVersionCallback) { + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) { super("recovery_status"); this.cancellableThreads = new CancellableThreads(); this.recoveryId = idGenerator.incrementAndGet(); @@ -113,7 +105,6 @@ public RecoveryTarget(final IndexShard indexShard, this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger, this::ensureRefCount); this.store = indexShard.store(); - this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; // make sure the store is not released until we are done. store.incRef(); indexShard.recoveryStats().incCurrentAsTarget(); @@ -125,7 +116,7 @@ public RecoveryTarget(final IndexShard indexShard, * @return a copy of this recovery target */ public RecoveryTarget retryCopy() { - return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback); + return new RecoveryTarget(indexShard, sourceNode, listener); } public long recoveryId() { @@ -314,11 +305,6 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l }); } - @Override - public void ensureClusterStateVersion(long clusterStateVersion) { - ensureClusterStateVersionCallback.accept(clusterStateVersion); - } - @Override public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { indexShard.activateWithPrimaryContext(primaryContext); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index be7c00d52c94d..697a168a22027 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -48,11 +48,6 @@ public interface RecoveryTargetHandler { */ void finalizeRecovery(long globalCheckpoint, ActionListener listener); - /** - * Blockingly waits for cluster state with at least clusterStateVersion to be available - */ - void ensureClusterStateVersion(long clusterStateVersion); - /** * Handoff the primary context between the relocation source and the relocation target. * diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index fcedf0a000ae6..357f82a744cf6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -95,14 +95,6 @@ public void finalizeRecovery(final long globalCheckpoint, final ActionListener TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } - @Override - public void ensureClusterStateVersion(long clusterStateVersion) { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE, - new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } - @Override public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { transportService.submitRequest( diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 1af143c1c98d7..b2e08f8f704fb 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -120,20 +120,19 @@ public void run() { }; thread.start(); IndexShard replica = shards.addReplica(); - Future future = shards.asyncRecoverReplica(replica, (indexShard, node) - -> new RecoveryTarget(indexShard, node, recoveryListener, version -> { - }) { - @Override - public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { - super.cleanFiles(totalTranslogOps, sourceMetaData); - latch.countDown(); - try { - latch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + Future future = shards.asyncRecoverReplica(replica, + (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) { + @Override + public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + super.cleanFiles(totalTranslogOps, sourceMetaData); + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } - } - }); + }); future.get(); thread.join(); shards.assertAllEqual(numDocs); @@ -197,7 +196,7 @@ public IndexResult index(Index op) throws IOException { thread.start(); IndexShard replica = shards.addReplica(); Future fut = shards.asyncRecoverReplica(replica, - (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ + (shard, node) -> new RecoveryTarget(shard, node, recoveryListener) { @Override public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 74ded2d749099..d8a7827a8cb48 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -491,7 +491,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { AtomicBoolean recoveryDone = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); - return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { + return new RecoveryTarget(indexShard, node, recoveryListener) { @Override public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { recoveryDone.set(true); @@ -556,7 +556,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { final IndexShard replica = shards.addReplica(); final Future recoveryFuture = shards.asyncRecoverReplica( replica, - (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { + (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -811,7 +811,7 @@ public static class BlockingTarget extends RecoveryTarget { public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) { - super(shard, sourceNode, listener, version -> {}); + super(shard, sourceNode, listener); this.recoveryBlocked = recoveryBlocked; this.releaseRecovery = releaseRecovery; this.stageToBlock = stageToBlock; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6b04abb356268..8e25f85ce5c94 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2427,8 +2427,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { - }) { + new RecoveryTarget(shard, discoveryNode, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2550,8 +2549,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException { // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { - }) { + new RecoveryTarget(shard, discoveryNode, recoveryListener) { @Override public void indexTranslogOperations( final List operations, @@ -2605,8 +2603,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); assertListenerCalled.accept(replica); recoverReplica(replica, primary, (shard, discoveryNode) -> - new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { - }) { + new RecoveryTarget(shard, discoveryNode, recoveryListener) { // we're only checking that listeners are called when the engine is open, before there is no point @Override public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 41ea9a8bea74b..a936816b96897 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -61,7 +61,7 @@ public void testGetStartingSeqNo() throws Exception { // Empty store { recoveryEmptyReplica(replica, true); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); recoveryTarget.decRef(); } @@ -77,7 +77,7 @@ public void testGetStartingSeqNo() throws Exception { flushShard(replica); replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } @@ -91,7 +91,7 @@ public void testGetStartingSeqNo() throws Exception { } } flushShard(replica); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } @@ -99,7 +99,7 @@ public void testGetStartingSeqNo() throws Exception { { replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); recoveryTarget.decRef(); } @@ -118,7 +118,7 @@ public void testGetStartingSeqNo() throws Exception { writer.setLiveCommitData(userData.entrySet()); writer.commit(); } - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); recoveryTarget.decRef(); } @@ -143,7 +143,7 @@ public void testWriteFileChunksConcurrently() throws Exception { final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId()); targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode)); - final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null); + final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null); recoveryTarget.receiveFileInfo( mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()), mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()), diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index ff70b05e99ffe..38c6179224a50 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -695,10 +695,6 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { } - @Override - public void ensureClusterStateVersion(long clusterStateVersion) { - } - @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) { } diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index f420b4e025e9c..a16fd5d95b431 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -161,7 +161,6 @@ public static BlockClusterStateProcessing relocateAndBlockCompletion(Logger logg BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(nodeTo, random()); internalCluster().setDisruptionScheme(disruption); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeTo); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeTo); CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1); // use a tracer on the target node to track relocation start and end @@ -177,18 +176,6 @@ public void receivedRequest(long requestId, String action) { // to the other nodes that should have a copy according to cluster state. receivedShardExistsRequestLatch.countDown(); logger.info("received: {}, relocation done", action); - } else if (action.equals(PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE)) { - logger.info("received: {}, waiting on cluster state", action); - // ensure that relocation target node is on the same cluster state as relocation source before proceeding with - // this request. If the target does not have the relocating cluster state exposed through ClusterService.state(), - // then waitForClusterState will have to register a ClusterObserver with the ClusterService, which can cause - // a race with the BlockClusterStateProcessing block that is added below. - try { - assertBusy(() -> assertTrue( - clusterService.state().routingTable().index(index).shard(shard).primaryShard().relocating())); - } catch (Exception e) { - throw new RuntimeException(e); - } } } }); diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index ca7290f5f7a85..a3b908fae5c9d 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -53,7 +53,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo public void testLastAccessTimeUpdate() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { final long lastSeenTime = status.target().lastAccessTime(); @@ -70,7 +70,7 @@ public void testLastAccessTimeUpdate() throws Exception { public void testRecoveryTimeout() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), @@ -98,7 +98,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo public void testRecoveryCancellation() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { @@ -117,7 +117,7 @@ public void testResetRecovery() throws Exception { shards.startAll(); int numDocs = randomIntBetween(1, 15); shards.indexDocs(numDocs); - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); IndexShard shard = shards.addReplica(); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); RecoveryTarget recoveryTarget = collection.getRecoveryTarget(recoveryId); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 37a18ef67ec06..d6d160f01ac84 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -410,7 +410,7 @@ public synchronized boolean removeReplica(IndexShard replica) throws IOException } public void recoverReplica(IndexShard replica) throws IOException { - recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {})); + recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener)); } public void recoverReplica(IndexShard replica, BiFunction targetSupplier) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0ba60ba872e9a..5b5ff8de01d03 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -569,8 +569,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, - (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> { - }), + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), true, startReplica); } diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index e845ab41b8af8..ec34d7e27a3b3 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -45,11 +45,6 @@ public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.target = target; } - @Override - public void ensureClusterStateVersion(long clusterStateVersion) { - target.ensureClusterStateVersion(clusterStateVersion); - } - @Override public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index dd495ee5ca587..e42ce17ad895e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -358,7 +358,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { // We need to recover the replica async to release the main thread for the following task to fill missing // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for. recoveryFuture = group.asyncRecoverReplica(newReplica, - (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {}); + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener) {}); } } if (recoveryFuture != null) {