Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -57,7 +56,6 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -93,7 +91,6 @@ public static class Actions {
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
}

Expand All @@ -112,7 +109,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState);
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);

transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
Expand All @@ -126,8 +123,6 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
RecoveryHandoffPrimaryContextRequest::new,
Expand Down Expand Up @@ -452,18 +447,6 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh
}
}

class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {

@Override
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {

@Override
Expand Down Expand Up @@ -538,46 +521,6 @@ public void onTimeout(TimeValue timeout) {
}
}

private void waitForClusterState(long clusterStateVersion) {
final ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, TimeValue.timeValueMinutes(5), logger,
threadPool.getThreadContext());
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
return;
} else {
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
final PlainActionFuture<Long> future = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {

@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(state.getVersion());
}

@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion));
}
}, newState -> newState.getVersion() >= clusterStateVersion);
try {
long currentVersion = future.get();
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion, currentVersion);
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage(
"failed waiting for cluster state with version {} (current: {})",
clusterStateVersion, clusterService.state().getVersion()), e);
throw ExceptionsHelper.convertToRuntime(e);
}
}
}

class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;

/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
Expand All @@ -51,12 +50,10 @@ public class RecoveriesCollection {

private final Logger logger;
private final ThreadPool threadPool;
private final LongConsumer ensureClusterStateVersionCallback;

public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer ensureClusterStateVersionCallback) {
public RecoveriesCollection(Logger logger, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
}

/**
Expand All @@ -66,7 +63,7 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer e
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;

/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
Expand All @@ -75,7 +74,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final MultiFileWriter multiFileWriter;
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
private final LongConsumer ensureClusterStateVersionCallback;

private final AtomicBoolean finished = new AtomicBoolean();

Expand All @@ -93,14 +91,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
* version; necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler})
*/
public RecoveryTarget(final IndexShard indexShard,
final DiscoveryNode sourceNode,
final PeerRecoveryTargetService.RecoveryListener listener,
final LongConsumer ensureClusterStateVersionCallback) {
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
Expand All @@ -113,7 +105,6 @@ public RecoveryTarget(final IndexShard indexShard,
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
this::ensureRefCount);
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
Expand All @@ -125,7 +116,7 @@ public RecoveryTarget(final IndexShard indexShard,
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
return new RecoveryTarget(indexShard, sourceNode, listener);
}

public long recoveryId() {
Expand Down Expand Up @@ -314,11 +305,6 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> l
});
}

@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.accept(clusterStateVersion);
}

@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
indexShard.activateWithPrimaryContext(primaryContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public interface RecoveryTargetHandler {
*/
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);

/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
*/
void ensureClusterStateVersion(long clusterStateVersion);

/**
* Handoff the primary context between the relocation source and the relocation target.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ public void finalizeRecovery(final long globalCheckpoint, final ActionListener<V
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}

@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
transportService.submitRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,19 @@ public void run() {
};
thread.start();
IndexShard replica = shards.addReplica();
Future<Void> future = shards.asyncRecoverReplica(replica, (indexShard, node)
-> new RecoveryTarget(indexShard, node, recoveryListener, version -> {
}) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
Future<Void> future = shards.asyncRecoverReplica(replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
});
});
future.get();
thread.join();
shards.assertAllEqual(numDocs);
Expand Down Expand Up @@ -197,7 +196,7 @@ public IndexResult index(Index op) throws IOException {
thread.start();
IndexShard replica = shards.addReplica();
Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
ActionListener<Void> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
AtomicBoolean recoveryDone = new AtomicBoolean(false);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
return new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
recoveryDone.set(true);
Expand Down Expand Up @@ -556,7 +556,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) {
final IndexShard replica = shards.addReplica();
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
Expand Down Expand Up @@ -811,7 +811,7 @@ public static class BlockingTarget extends RecoveryTarget {
public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery,
IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
Logger logger) {
super(shard, sourceNode, listener, version -> {});
super(shard, sourceNode, listener);
this.recoveryBlocked = recoveryBlocked;
this.releaseRecovery = releaseRecovery;
this.stageToBlock = stageToBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2427,8 +2427,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException {
indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}");
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
Expand Down Expand Up @@ -2550,8 +2549,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException {
// Shard is still inactive since we haven't started recovering yet
assertFalse(replica.isActive());
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
@Override
public void indexTranslogOperations(
final List<Translog.Operation> operations,
Expand Down Expand Up @@ -2605,8 +2603,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
assertListenerCalled.accept(replica);
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
Expand Down
Loading