diff --git a/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java new file mode 100644 index 0000000000000..fab8850d162b9 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +public class LongTuple { + + public static LongTuple tuple(final T v1, final long v2) { + return new LongTuple<>(v1, v2); + } + + private final T v1; + private final long v2; + + private LongTuple(final T v1, final long v2) { + this.v1 = v1; + this.v2 = v2; + } + + public T v1() { + return v1; + } + + public long v2() { + return v2; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LongTuple tuple = (LongTuple) o; + + return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2); + } + + @Override + public int hashCode() { + int result = v1 != null ? v1.hashCode() : 0; + result = 31 * result + Long.hashCode(v2); + return result; + } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index ea6edef7a12fa..aeafbc1110850 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.LongTuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or @@ -42,6 +49,8 @@ */ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { + long appliedClusterStateVersion; + /* * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by @@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ private long globalCheckpoint; + /* + * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional + * mutations to this tracker until the handoff has completed. + */ + private boolean sealed = false; + /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. @@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @param localCheckpoint the local checkpoint for the shard */ public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } final boolean updated; if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) { updated = true; @@ -210,11 +228,18 @@ synchronized void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { /** * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ public synchronized void updateAllocationIdsFromMaster( - final Set activeAllocationIds, final Set initializingAllocationIds) { + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { + if (applyingClusterStateVersion < appliedClusterStateVersion) { + return; + } + + appliedClusterStateVersion = applyingClusterStateVersion; + // remove shards whose allocation ID no longer exists inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a)); @@ -248,6 +273,135 @@ public synchronized void updateAllocationIdsFromMaster( updateGlobalCheckpointOnPrimary(); } + /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + synchronized PrimaryContext primaryContext() { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + sealed = true; + final ObjectLongMap inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints); + final ObjectLongMap trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints); + return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints); + } + + /** + * Releases a previously acquired primary context. + */ + synchronized void releasePrimaryContext() { + assert sealed; + sealed = false; + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the primary context + */ + synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + /* + * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation + * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary + * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define + * the following values: + * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the + * relocation target + * - version(context) = the cluster state version on the relocation source when the primary context was sampled + * - version(target) = the current cluster state version on the relocation target + * + * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and + * version(target) < version(context) are all possibilities. + * + * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing + * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target + * receives from the relocation source via the primary context. + * + * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary + * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can + * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target + * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary + * context. + * + * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary + * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can + * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state. + * + * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in + * the cluster state applied on the target. + * + * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already + * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to + * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. + * + * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not + * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). + * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, + * these shards are not problematic. + * + * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but + * not contained in the cluster state applied on the target. + * + * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on + * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not + * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here. + * + * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In + * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking + * map and are safe here too. + */ + + assert StreamSupport + .stream(inSyncLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints; + assert StreamSupport + .stream(trackingLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints; + assert pendingInSync.isEmpty() : pendingInSync; + + if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) { + final Set activeAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class))); + final Set initializingAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class))); + updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds); + } + + /* + * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means + * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not + * regress. + */ + final List> inSync = + StreamSupport + .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false) + .map(e -> LongTuple.tuple(e.key, e.value)) + .collect(Collectors.toList()); + + inSync.sort(Comparator.comparingLong(LongTuple::v2)); + + for (final LongTuple cursor : inSync) { + assert cursor.v2() >= globalCheckpoint + : "local checkpoint [" + cursor.v2() + "] " + + "for allocation ID [" + cursor.v1() + "] " + + "violates being at least the global checkpoint [" + globalCheckpoint + "]"; + updateLocalCheckpoint(cursor.v1(), cursor.v2()); + if (trackingLocalCheckpoints.containsKey(cursor.v1())) { + moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation"); + updateGlobalCheckpointOnPrimary(); + } + } + + for (final ObjectLongCursor cursor : primaryContext.trackingLocalCheckpoints()) { + updateLocalCheckpoint(cursor.key, cursor.value); + } + } + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint * on the specified shard advances above the current global checkpoint. @@ -258,6 +412,9 @@ public synchronized void updateAllocationIdsFromMaster( * @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance */ public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } if (!trackingLocalCheckpoints.containsKey(allocationId)) { /* * This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing @@ -295,15 +452,13 @@ private synchronized void waitForAllocationIdToBeInSync(final String allocationI */ final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE); if (current >= globalCheckpoint) { - logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current); - trackingLocalCheckpoints.remove(allocationId); /* * This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could * still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the * shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the * master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs. */ - inSyncLocalCheckpoints.put(allocationId, current); + moveAllocationIdFromTrackingToInSync(allocationId, "recovery"); break; } else { waitForLocalCheckpointToAdvance(); @@ -311,6 +466,21 @@ private synchronized void waitForAllocationIdToBeInSync(final String allocationI } } + /** + * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has + * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the + * relocation source. + * + * @param allocationId the allocation ID to move + * @param reason the reason for the transition + */ + private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) { + assert trackingLocalCheckpoints.containsKey(allocationId); + final long current = trackingLocalCheckpoints.remove(allocationId); + inSyncLocalCheckpoints.put(allocationId, current); + logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason); + } + /** * Wait for the local checkpoint to advance to the global checkpoint. * @@ -324,12 +494,21 @@ private synchronized void waitForLocalCheckpointToAdvance() throws InterruptedEx /** * Check if there are any recoveries pending in-sync. * - * @return {@code true} if there is at least one shard pending in-sync, otherwise false + * @return true if there is at least one shard pending in-sync, otherwise false */ - public boolean pendingInSync() { + boolean pendingInSync() { return !pendingInSync.isEmpty(); } + /** + * Check if the tracker is sealed. + * + * @return true if the tracker is sealed, otherwise false. + */ + boolean sealed() { + return sealed; + } + /** * Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if * the shard is not in-sync. diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 4180c7e0f7d92..6d8b87599a125 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import java.util.Set; @@ -165,13 +166,24 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set activeAllocationIds, final Set initializingAllocationIds) { - globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { + globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext); } /** @@ -183,4 +195,20 @@ public boolean pendingInSync() { return globalCheckpointTracker.pendingInSync(); } + /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + public PrimaryContext primaryContext() { + return globalCheckpointTracker.primaryContext(); + } + + /** + * Releases a previously acquired primary context. + */ + public void releasePrimaryContext() { + globalCheckpointTracker.releasePrimaryContext(); + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71efacf7dcfd8..13ced02f6b88a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -515,31 +515,37 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); - public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { + /** + * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided + * {@link Runnable} is executed after all operations are successfully blocked. + * + * @param reason the reason for the relocation + * @param consumer a {@link Runnable} that is executed after operations are blocked + * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation + * @throws InterruptedException if blocking operations is interrupted + */ + public void relocated( + final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : - "in-flight operations in progress while moving shard state to relocated"; - synchronized (mutex) { - if (state != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, state); - } - // if the master cancelled the recovery, the target will be removed - // and the recovery will stopped. - // However, it is still possible that we concurrently end up here - // and therefore have to protect we don't mark the shard as relocated when - // its shard routing says otherwise. - if (shardRouting.relocating() == false) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": shard is no longer relocating " + shardRouting); - } - if (primaryReplicaResyncInProgress.get()) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + "in-flight operations in progress while moving shard state to relocated"; + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext(); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + changeState(IndexShardState.RELOCATED, reason); } - changeState(IndexShardState.RELOCATED, reason); + } catch (final Exception e) { + getEngine().seqNoService().releasePrimaryContext(); } }); } catch (TimeoutException e) { @@ -551,6 +557,26 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int } } + private void verifyRelocatingState() { + if (state != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, state); + } + /* + * If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible + * that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing + * says otherwise. + */ + + if (shardRouting.relocating() == false) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": shard is no longer relocating " + shardRouting); + } + + if (primaryReplicaResyncInProgress.get()) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + } + } public IndexShardState state() { return state; @@ -1319,7 +1345,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn private void verifyPrimary() { if (shardRouting.primary() == false) { - throw new IllegalStateException("shard is not a primary " + shardRouting); + throw new IllegalStateException("shard " + shardRouting + " is not a primary"); } } @@ -1327,8 +1353,8 @@ private void verifyReplicationTarget() { final IndexShardState state = state(); if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalStateException("active primary shard cannot be a replication target before " + - " relocation hand off " + shardRouting + ", state is [" + state + "]"); + throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + + "relocation hand off, state is [" + state + "]"); } } @@ -1603,8 +1629,8 @@ public void markAllocationIdAsInSync(final String allocationId, final long local verifyPrimary(); getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); /* - * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the - * replica; mark our self as active to force a future background sync. + * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to + * the replica; mark our self as active to force a future background sync. */ active.compareAndSet(false, true); } @@ -1654,18 +1680,34 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) { /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} + * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} * for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set activeAllocationIds, final Set initializingAllocationIds) { + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set activeAllocationIds, final Set initializingAllocationIds) { verifyPrimary(); final Engine engine = getEngineOrNull(); // if the engine is not yet started, we are not ready yet and can just ignore this if (engine != null) { - engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + verifyPrimary(); + assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; + final Engine engine = getEngineOrNull(); + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java new file mode 100644 index 0000000000000..8a067d3718159 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing + * shards and their local checkpoints. + */ +public class PrimaryContext implements Writeable { + + private long clusterStateVersion; + + public long clusterStateVersion() { + return clusterStateVersion; + } + + private ObjectLongMap inSyncLocalCheckpoints; + + public ObjectLongMap inSyncLocalCheckpoints() { + return inSyncLocalCheckpoints; + } + + private ObjectLongMap trackingLocalCheckpoints; + + public ObjectLongMap trackingLocalCheckpoints() { + return trackingLocalCheckpoints; + } + + public PrimaryContext( + final long clusterStateVersion, + final ObjectLongMap inSyncLocalCheckpoints, + final ObjectLongMap trackingLocalCheckpoints) { + this.clusterStateVersion = clusterStateVersion; + this.inSyncLocalCheckpoints = inSyncLocalCheckpoints; + this.trackingLocalCheckpoints = trackingLocalCheckpoints; + } + + public PrimaryContext(final StreamInput in) throws IOException { + clusterStateVersion = in.readVLong(); + inSyncLocalCheckpoints = readMap(in); + trackingLocalCheckpoints = readMap(in); + } + + private static ObjectLongMap readMap(final StreamInput in) throws IOException { + final int length = in.readVInt(); + final ObjectLongMap map = new ObjectLongHashMap<>(length); + for (int i = 0; i < length; i++) { + final String key = in.readString(); + final long value = in.readZLong(); + map.addTo(key, value); + } + return map; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVLong(clusterStateVersion); + writeMap(out, inSyncLocalCheckpoints); + writeMap(out, trackingLocalCheckpoints); + } + + private static void writeMap(final StreamOutput out, final ObjectLongMap map) throws IOException { + out.writeVInt(map.size()); + for (ObjectLongCursor cursor : map) { + out.writeString(cursor.key); + out.writeZLong(cursor.value); + } + } + + @Override + public String toString() { + return "PrimaryContext{" + + "clusterStateVersion=" + clusterStateVersion + + ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints + + ", trackingLocalCheckpoints=" + trackingLocalCheckpoints + + '}'; + } + +} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 385b342efbe88..81c0f601e1cc9 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -571,7 +571,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), primaryReplicaSyncer::resync); - shard.updateAllocationIdsFromMaster(activeIds, initializingIds); + shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds); } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -758,12 +758,14 @@ void updatePrimaryTerm(long primaryTerm, /** * Notifies the service of the current allocation ids in the cluster state. - * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation ids of the currently active shard copies - * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies */ - void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds); + void updateAllocationIdsFromMaster( + long applyingClusterStateVersion, Set activeAllocationIds, Set initializingAllocationIds); } public interface AllocatedIndex extends Iterable, IndexComponent { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 4823edcc2f119..37ab2798b1f4b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -82,6 +82,7 @@ public static class Actions { public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; + public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context"; } private final ThreadPool threadPool; @@ -116,6 +117,11 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans FinalizeRecoveryRequestHandler()); transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new, ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler()); + transportService.registerRequestHandler( + Actions.HANDOFF_PRIMARY_CONTEXT, + RecoveryHandoffPrimaryContextRequest::new, + ThreadPool.Names.GENERIC, + new HandoffPrimaryContextRequestHandler()); } @Override @@ -411,6 +417,18 @@ public void messageReceived(RecoveryWaitForClusterStateRequest request, Transpor } } + class HandoffPrimaryContextRequestHandler implements TransportRequestHandler { + + @Override + public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + recoveryRef.target().handoffPrimaryContext(request.primaryContext()); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + } + class TranslogOperationsRequestHandler implements TransportRequestHandler { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java new file mode 100644 index 0000000000000..6646f6cea5d41 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * The request object to handoff the primary context to the relocation target. + */ +class RecoveryHandoffPrimaryContextRequest extends TransportRequest { + + private long recoveryId; + private ShardId shardId; + private PrimaryContext primaryContext; + + /** + * Initialize an empty request (used to serialize into when reading from a stream). + */ + RecoveryHandoffPrimaryContextRequest() { + } + + /** + * Initialize a request for the specified relocation. + * + * @param recoveryId the recovery ID of the relocation + * @param shardId the shard ID of the relocation + * @param primaryContext the primary context + */ + RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.primaryContext = primaryContext; + } + + long recoveryId() { + return this.recoveryId; + } + + ShardId shardId() { + return shardId; + } + + PrimaryContext primaryContext() { + return primaryContext; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + primaryContext = new PrimaryContext(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + primaryContext.writeTo(out); + } + + @Override + public String toString() { + return "RecoveryHandoffPrimaryContextRequest{" + + "recoveryId=" + recoveryId + + ", shardId=" + shardId + + ", primaryContext=" + primaryContext + + '}'; + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 36f71899fa8da..3097c8e668f58 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; @@ -41,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -52,6 +54,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; import java.io.BufferedOutputStream; @@ -60,7 +63,9 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -450,7 +455,21 @@ public void finalizeRecovery(final long targetLocalCheckpoint) { StopWatch stopWatch = new StopWatch().start(); logger.trace("finalizing recovery"); cancellableThreads.execute(() -> { - shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + /* + * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a + * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done + * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire + * the permit then the state of the shard will be relocated and this recovery will fail. + */ + final PlainActionFuture onAcquired = new PlainActionFuture<>(); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + try (Releasable ignored = onAcquired.actionGet()) { + if (shard.state() == IndexShardState.RELOCATED) { + throw new IndexShardRelocatedException(shard.shardId()); + } + shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + } + recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); @@ -465,7 +484,7 @@ public void finalizeRecovery(final long targetLocalCheckpoint) { cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion)); logger.trace("performing relocation hand-off"); - cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode())); + cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); } /* * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 3b96ef1a02ebc..2837a85d1ae98 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -63,7 +63,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; -import java.util.stream.Collectors; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of @@ -379,6 +378,11 @@ public void ensureClusterStateVersion(long clusterStateVersion) { ensureClusterStateVersionCallback.accept(clusterStateVersion); } + @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + indexShard.updateAllocationIdsFromPrimaryContext(primaryContext); + } + @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 42cf1bc1ce19d..34b0df2293f3f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -49,6 +50,13 @@ public interface RecoveryTargetHandler { */ void ensureClusterStateVersion(long clusterStateVersion); + /** + * Handoff the primary context between the relocation source and the relocation target. + * + * @param primaryContext the primary context from the relocation source + */ + void handoffPrimaryContext(PrimaryContext primaryContext); + /** * Index a set of translog operations on the target * @param operations operations to index diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 414cbd4ea49eb..14c8f762e6dac 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -95,7 +96,17 @@ public void ensureClusterStateVersion(long clusterStateVersion) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE, new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + + @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + transportService.submitRequest( + targetNode, + PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, + new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8811083baa9cb..e9c8916634884 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2022,7 +2022,10 @@ public void testSeqNoAndCheckpoints() throws IOException { initialEngine = engine; initialEngine .seqNoService() - .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); + .updateAllocationIdsFromMaster( + randomNonNegativeLong(), + new HashSet<>(Arrays.asList("primary", "replica")), + Collections.emptySet()); for (int op = 0; op < opCount; op++) { final String id; // mostly index, sometimes delete diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e7518bd5944b9..7962f23caf0c0 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -122,6 +122,7 @@ protected DiscoveryNode getDiscoveryNode(String id) { } protected class ReplicationGroup implements AutoCloseable, Iterable { + private long clusterStateVersion; private IndexShard primary; private IndexMetaData indexMetaData; private final List replicas; @@ -142,6 +143,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting)); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; + clusterStateVersion = 1; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { addReplica(); @@ -222,6 +224,7 @@ public void startPrimary() throws IOException { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); primary.recoverFromStore(); primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); + clusterStateVersion++; updateAllocationIDsOnPrimary(); for (final IndexShard replica : replicas) { recoverReplica(replica); @@ -241,6 +244,7 @@ assert shardRoutings().stream() .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); + clusterStateVersion++; updateAllocationIDsOnPrimary(); } @@ -255,6 +259,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting)); replicas.add(newReplica); + clusterStateVersion++; updateAllocationIDsOnPrimary(); return newReplica; } @@ -274,6 +279,7 @@ public synchronized Future promoteReplicaToPrim closeShards(primary); primary = replica; primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + PlainActionFuture fut = new PlainActionFuture<>(); primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard, new ActionListener() { @@ -289,6 +295,7 @@ public void onFailure(Exception e) { fut.onFailure(e); } })); + clusterStateVersion++; updateAllocationIDsOnPrimary(); return fut; } @@ -296,6 +303,7 @@ public void onFailure(Exception e) { synchronized boolean removeReplica(IndexShard replica) { final boolean removed = replicas.remove(replica); if (removed) { + clusterStateVersion++; updateAllocationIDsOnPrimary(); } return removed; @@ -315,6 +323,7 @@ public void recoverReplica( BiFunction targetSupplier, boolean markAsRecovering) throws IOException { ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering); + clusterStateVersion++; updateAllocationIDsOnPrimary(); } @@ -402,7 +411,7 @@ private void updateAllocationIDsOnPrimary() { initializing.add(shard.allocationId().getId()); } } - primary.updateAllocationIdsFromMaster(active, initializing); + primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java index 61eb45813288b..ae4aab107f207 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java @@ -19,9 +19,13 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -29,7 +33,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,11 +46,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; public class GlobalCheckpointTrackerTests extends ESTestCase { @@ -79,6 +86,7 @@ private Map randomAllocationsWithLocalCheckpoints(int min, int max } public void testGlobalCheckpointUpdate() { + final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5); Set active = new HashSet<>(activeWithCheckpoints.keySet()); @@ -107,7 +115,7 @@ public void testGlobalCheckpointUpdate() { logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); }); - tracker.updateAllocationIdsFromMaster(active, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId, tracker.getGlobalCheckpoint())); allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId, allocations.get(aId))); @@ -130,7 +138,7 @@ public void testGlobalCheckpointUpdate() { Set newActive = new HashSet<>(active); newActive.add(extraId); - tracker.updateAllocationIdsFromMaster(newActive, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActive, initializing); // now notify for the new id tracker.updateLocalCheckpoint(extraId, minLocalCheckpointAfterUpdates + 1 + randomInt(4)); @@ -146,6 +154,7 @@ public void testMissingActiveIdsPreventAdvance() { assigned.putAll(active); assigned.putAll(initializing); tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), active.keySet(), initializing.keySet()); randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); @@ -166,7 +175,7 @@ public void testMissingActiveIdsPreventAdvance() { public void testMissingInSyncIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(0, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); randomSubsetOf(randomInt(initializing.size() - 1), initializing.keySet()).forEach(aId -> tracker.updateLocalCheckpoint(aId, initializing.get(aId))); @@ -184,7 +193,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { final Map active = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); - tracker.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet()); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), active.keySet(), initializing.keySet()); initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); nonApproved.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); @@ -196,6 +205,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { } public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { + final long initialClusterStateVersion = randomNonNegativeLong(); final Map activeToStay = randomAllocationsWithLocalCheckpoints(1, 5); final Map initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5); final Map activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5); @@ -211,7 +221,7 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { if (randomBoolean()) { allocations.putAll(initializingToBeRemoved); } - tracker.updateAllocationIdsFromMaster(active, initializing); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, active, initializing); if (randomBoolean()) { initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k, tracker.getGlobalCheckpoint())); } else { @@ -223,11 +233,11 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { // now remove shards if (randomBoolean()) { - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, activeToStay.keySet(), initializingToStay.keySet()); allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); } else { allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid, ckp + 10L)); - tracker.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet()); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, activeToStay.keySet(), initializingToStay.keySet()); } final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream()) @@ -243,7 +253,8 @@ public void testWaitForAllocationIdToBeInSync() throws BrokenBarrierException, I final AtomicBoolean complete = new AtomicBoolean(); final String inSyncAllocationId =randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(16); - tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { @@ -291,7 +302,8 @@ public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBar final AtomicBoolean interrupted = new AtomicBoolean(); final String inSyncAllocationId = randomAlphaOfLength(16); final String trackingAllocationId = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster(Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); + tracker.updateAllocationIdsFromMaster( + randomNonNegativeLong(), Collections.singleton(inSyncAllocationId), Collections.singleton(trackingAllocationId)); tracker.updateLocalCheckpoint(inSyncAllocationId, globalCheckpoint); final Thread thread = new Thread(() -> { try { @@ -329,21 +341,14 @@ public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBar } public void testUpdateAllocationIdsFromMaster() throws Exception { + final long initialClusterStateVersion = randomNonNegativeLong(); final int numberOfActiveAllocationsIds = randomIntBetween(2, 16); - final Set activeAllocationIds = - IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16)).collect(Collectors.toSet()); final int numberOfInitializingIds = randomIntBetween(2, 16); - final Set initializingIds = - IntStream.range(0, numberOfInitializingIds).mapToObj(i -> { - do { - final String initializingId = randomAlphaOfLength(16); - // ensure we do not duplicate an allocation ID in active and initializing sets - if (!activeAllocationIds.contains(initializingId)) { - return initializingId; - } - } while (true); - }).collect(Collectors.toSet()); - tracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingIds); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingIds); // first we assert that the in-sync and tracking sets are set up correctly assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); @@ -364,7 +369,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { final List removingInitializingAllocationIds = randomSubsetOf(initializingIds); final Set newInitializingAllocationIds = initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet()); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 1, newActiveAllocationIds, newInitializingAllocationIds); assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue(removingActiveAllocationIds.stream().noneMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue(newInitializingAllocationIds.stream().allMatch(a -> tracker.trackingLocalCheckpoints.containsKey(a))); @@ -376,7 +381,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { */ newActiveAllocationIds.add(randomAlphaOfLength(32)); newInitializingAllocationIds.add(randomAlphaOfLength(64)); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 2, newActiveAllocationIds, newInitializingAllocationIds); assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.inSyncLocalCheckpoints.containsKey(a))); assertTrue( newActiveAllocationIds @@ -416,7 +421,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { // using a different length than we have been using above ensures that we can not collide with a previous allocation ID final String newSyncingAllocationId = randomAlphaOfLength(128); newInitializingAllocationIds.add(newSyncingAllocationId); - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 3, newActiveAllocationIds, newInitializingAllocationIds); final CyclicBarrier barrier = new CyclicBarrier(2); final Thread thread = new Thread(() -> { try { @@ -450,7 +455,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { * the in-sync set even if we receive a cluster state update that does not reflect this. * */ - tracker.updateAllocationIdsFromMaster(newActiveAllocationIds, newInitializingAllocationIds); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion + 4, newActiveAllocationIds, newInitializingAllocationIds); assertFalse(tracker.trackingLocalCheckpoints.containsKey(newSyncingAllocationId)); assertTrue(tracker.inSyncLocalCheckpoints.containsKey(newSyncingAllocationId)); } @@ -471,7 +476,7 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok final String active = randomAlphaOfLength(16); final String initializing = randomAlphaOfLength(32); - tracker.updateAllocationIdsFromMaster(Collections.singleton(active), Collections.singleton(initializing)); + tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(active), Collections.singleton(initializing)); final CyclicBarrier barrier = new CyclicBarrier(4); @@ -516,7 +521,216 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok markingThread.join(); assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint)); + } + + public void testPrimaryContextOlderThanAppliedClusterState() { + final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE - 1) + 1; + final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); + + /* + * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the + * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the + * newer cluster state is a superset of the allocation IDs in the applied cluster state with the caveat that an existing + * initializing allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the + * set of initializing allocation IDs is otherwise arbitrary. + */ + final int numberOfAdditionalInitializingAllocationIds = randomIntBetween(0, 8); + final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); + final Set newInitializingAllocationIds = + randomAllocationIdsExcludingExistingIds( + Sets.union(activeAllocationIds, initializingAllocationIds), numberOfAdditionalInitializingAllocationIds); + final Set contextInitializingIds = Sets.union( + new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), + newInitializingAllocationIds); + + final int numberOfAdditionalActiveAllocationIds = randomIntBetween(0, 8); + final Set contextActiveAllocationIds = Sets.union( + Sets.union( + activeAllocationIds, + randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfAdditionalActiveAllocationIds)), + initializedAllocationIds); + + final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : contextActiveAllocationIds) { + activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + final ObjectLongMap initializingAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : contextInitializingIds) { + initializingAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + + final PrimaryContext primaryContext = new PrimaryContext( + initialClusterStateVersion - randomIntBetween(0, Math.toIntExact(initialClusterStateVersion) - 1), + activeAllocationIdsLocalCheckpoints, + initializingAllocationIdsLocalCheckpoints); + + tracker.updateAllocationIdsFromPrimaryContext(primaryContext); + + // the primary context carries an older cluster state version + assertThat(tracker.appliedClusterStateVersion, equalTo(initialClusterStateVersion)); + + // only existing active allocation IDs and initializing allocation IDs that moved to initialized should be in-sync + assertThat( + Sets.union(activeAllocationIds, initializedAllocationIds), + equalTo( + StreamSupport + .stream(tracker.inSyncLocalCheckpoints.keys().spliterator(), false) + .map(e -> e.value) + .collect(Collectors.toSet()))); + + // the local checkpoints known to the tracker for in-sync shards should match what is known in the primary context + for (final String allocationId : Sets.union(activeAllocationIds, initializedAllocationIds)) { + assertThat( + tracker.inSyncLocalCheckpoints.get(allocationId), equalTo(primaryContext.inSyncLocalCheckpoints().get(allocationId))); + } + + // only existing initializing allocation IDs that did not moved to initialized should be tracked + assertThat( + Sets.difference(initializingAllocationIds, initializedAllocationIds), + equalTo( + StreamSupport + .stream(tracker.trackingLocalCheckpoints.keys().spliterator(), false) + .map(e -> e.value) + .collect(Collectors.toSet()))); + + // the local checkpoints known to the tracker for initializing shards should match what is known in the primary context + for (final String allocationId : Sets.difference(initializingAllocationIds, initializedAllocationIds)) { + if (primaryContext.trackingLocalCheckpoints().containsKey(allocationId)) { + assertThat( + tracker.trackingLocalCheckpoints.get(allocationId), + equalTo(primaryContext.trackingLocalCheckpoints().get(allocationId))); + } else { + assertThat(tracker.trackingLocalCheckpoints.get(allocationId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + + // the global checkpoint can only be computed from active allocation IDs and initializing allocation IDs that moved to initializing + final long globalCheckpoint = + StreamSupport + .stream(activeAllocationIdsLocalCheckpoints.spliterator(), false) + .filter(e -> tracker.inSyncLocalCheckpoints.containsKey(e.key) || initializedAllocationIds.contains(e.key)) + .mapToLong(e -> e.value) + .min() + .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); + assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); + } + + public void testPrimaryContextNewerThanAppliedClusterState() { + final long initialClusterStateVersion = randomIntBetween(0, Integer.MAX_VALUE); + final int numberOfActiveAllocationsIds = randomIntBetween(0, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + tracker.updateAllocationIdsFromMaster(initialClusterStateVersion, activeAllocationIds, initializingAllocationIds); + + /* + * We are going to establish a primary context from a cluster state version older than the applied cluster state version on the + * tracker. Because of recovery barriers established during relocation handoff, we know that the set of active allocation IDs in the + * newer cluster state is a subset of the allocation IDs in the applied cluster state with the caveat that an existing initializing + * allocation ID could have moved to an in-sync allocation ID within the tracker due to recovery finalization, and the set of + * initializing allocation IDs is otherwise arbitrary. + */ + final int numberOfNewInitializingAllocationIds = randomIntBetween(0, 8); + final Set initializedAllocationIds = new HashSet<>(randomSubsetOf(initializingAllocationIds)); + final Set newInitializingAllocationIds = + randomAllocationIdsExcludingExistingIds( + Sets.union(activeAllocationIds, initializingAllocationIds), numberOfNewInitializingAllocationIds); + + final ObjectLongMap activeAllocationIdsLocalCheckpoints = new ObjectLongHashMap<>(); + for (final String allocationId : Sets.union(new HashSet<>(randomSubsetOf(activeAllocationIds)), initializedAllocationIds)) { + activeAllocationIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + final ObjectLongMap initializingIdsLocalCheckpoints = new ObjectLongHashMap<>(); + final Set contextInitializingAllocationIds = Sets.union( + new HashSet<>(randomSubsetOf(Sets.difference(initializingAllocationIds, initializedAllocationIds))), + newInitializingAllocationIds); + for (final String allocationId : contextInitializingAllocationIds) { + initializingIdsLocalCheckpoints.put(allocationId, randomNonNegativeLong()); + } + + final PrimaryContext primaryContext = + new PrimaryContext( + initialClusterStateVersion + randomIntBetween(0, Integer.MAX_VALUE) + 1, + activeAllocationIdsLocalCheckpoints, + initializingIdsLocalCheckpoints); + + tracker.updateAllocationIdsFromPrimaryContext(primaryContext); + + final PrimaryContext trackerPrimaryContext = tracker.primaryContext(); + try { + assertTrue(tracker.sealed()); + final long globalCheckpoint = + StreamSupport + .stream(activeAllocationIdsLocalCheckpoints.values().spliterator(), false) + .mapToLong(e -> e.value) + .min() + .orElse(SequenceNumbersService.UNASSIGNED_SEQ_NO); + + // the primary context contains knowledge of the state of the entire universe + assertThat(primaryContext.clusterStateVersion(), equalTo(trackerPrimaryContext.clusterStateVersion())); + assertThat(primaryContext.inSyncLocalCheckpoints(), equalTo(trackerPrimaryContext.inSyncLocalCheckpoints())); + assertThat(primaryContext.trackingLocalCheckpoints(), equalTo(trackerPrimaryContext.trackingLocalCheckpoints())); + assertThat(tracker.getGlobalCheckpoint(), equalTo(globalCheckpoint)); + } finally { + tracker.releasePrimaryContext(); + assertFalse(tracker.sealed()); + } + } + + public void testPrimaryContextSealing() { + // the tracker should start in the state of not being sealed + assertFalse(tracker.sealed()); + + // sampling the primary context should seal the tracker + tracker.primaryContext(); + assertTrue(tracker.sealed()); + + // invoking any method that mutates the state of the tracker should fail + assertIllegalStateExceptionWhenSealed(() -> tracker.updateLocalCheckpoint(randomAlphaOfLength(16), randomNonNegativeLong())); + assertIllegalStateExceptionWhenSealed(() -> tracker.updateGlobalCheckpointOnReplica(randomNonNegativeLong())); + assertIllegalStateExceptionWhenSealed( + () -> tracker.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.emptySet(), Collections.emptySet())); + assertIllegalStateExceptionWhenSealed(() -> tracker.updateAllocationIdsFromPrimaryContext(mock(PrimaryContext.class))); + assertIllegalStateExceptionWhenSealed(() -> tracker.primaryContext()); + assertIllegalStateExceptionWhenSealed(() -> tracker.markAllocationIdAsInSync(randomAlphaOfLength(16), randomNonNegativeLong())); + + // closing the releasable should unseal the tracker + tracker.releasePrimaryContext(); + assertFalse(tracker.sealed()); + } + + private void assertIllegalStateExceptionWhenSealed(final ThrowingRunnable runnable) { + final IllegalStateException e = expectThrows(IllegalStateException.class, runnable); + assertThat(e, hasToString(containsString("global checkpoint tracker is sealed"))); + } + + private Tuple, Set> randomActiveAndInitializingAllocationIds( + final int numberOfActiveAllocationsIds, + final int numberOfInitializingIds) { + final Set activeAllocationIds = + IntStream.range(0, numberOfActiveAllocationsIds).mapToObj(i -> randomAlphaOfLength(16) + i).collect(Collectors.toSet()); + final Set initializingIds = randomAllocationIdsExcludingExistingIds(activeAllocationIds, numberOfInitializingIds); + return Tuple.tuple(activeAllocationIds, initializingIds); + } + private Set randomAllocationIdsExcludingExistingIds(final Set existingAllocationIds, final int numberOfAllocationIds) { + return IntStream.range(0, numberOfAllocationIds).mapToObj(i -> { + do { + final String newAllocationId = randomAlphaOfLength(16); + // ensure we do not duplicate an allocation ID + if (!existingAllocationIds.contains(newAllocationId)) { + return newAllocationId + i; + } + } while (true); + }).collect(Collectors.toSet()); } private void markAllocationIdAsInSyncQuietly( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cc837a0afe192..a341c26898759 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -536,7 +536,7 @@ public void testOperationPermitOnReplicaShards() throws InterruptedException, Ex routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); indexShard.updateRoutingEntry(routing); - indexShard.relocated("test"); + indexShard.relocated("test", primaryContext -> {}); engineClosed = false; break; } @@ -551,7 +551,7 @@ public void testOperationPermitOnReplicaShards() throws InterruptedException, Ex if (shardRouting.primary() == false) { final IllegalStateException e = expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); - assertThat(e, hasToString(containsString("shard is not a primary"))); + assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } final long primaryTerm = indexShard.getPrimaryTerm(); @@ -1042,7 +1042,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1071,7 +1071,7 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); Thread recoveryThread = new Thread(() -> { try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1124,7 +1124,7 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated("simulated recovery"); + shard.relocated("simulated recovery", primaryContext -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1158,7 +1158,7 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); - shard.relocated("test"); + shard.relocated("test", primaryContext -> {}); expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting)); closeShards(shard); } @@ -1168,7 +1168,7 @@ public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOE final ShardRouting originalRouting = shard.routingEntry(); shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); shard.updateRoutingEntry(originalRouting); - expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test")); + expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {})); closeShards(shard); } @@ -1187,7 +1187,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated("test"); + shard.relocated("test", primaryContext -> {}); } }); relocationThread.start(); @@ -1362,7 +1362,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); shard.updateRoutingEntry(inRecoveryRouting); - shard.relocated("simulate mark as relocated"); + shard.relocated("simulate mark as relocated", primaryContext -> {}); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); try { shard.updateRoutingEntry(origRouting); diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d19a51e6271db..bf0b72867406e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -62,7 +62,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1; String allocationId = shard.routingEntry().allocationId().getId(); - shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet()); + shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet()); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getGlobalCheckpoint()); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 7a53f8f9f5949..73e2d7eb0bd1c 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -322,6 +322,7 @@ public Index index() { * Mock for {@link IndexShard} */ protected class MockIndexShard implements IndicesClusterStateService.Shard { + private volatile long clusterStateVersion; private volatile ShardRouting shardRouting; private volatile RecoveryState recoveryState; private volatile Set activeAllocationIds; @@ -372,7 +373,9 @@ public void updatePrimaryTerm(final long newPrimaryTerm, } @Override - public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { + public void updateAllocationIdsFromMaster( + long applyingClusterStateVersion, Set activeAllocationIds, Set initializingAllocationIds) { + this.clusterStateVersion = applyingClusterStateVersion; this.activeAllocationIds = activeAllocationIds; this.initializingAllocationIds = initializingAllocationIds; } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e73bd8a949748..5532ad040f26d 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; @@ -76,6 +77,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -453,8 +455,14 @@ public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, Int relocated.set(true); assertTrue(recoveriesDelayed.get()); return null; - }).when(shard).relocated(any(String.class)); + }).when(shard).relocated(any(String.class), any(Consumer.class)); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + final ActionListener listener = (ActionListener)invocationOnMock.getArguments()[0]; + listener.onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(ActionListener.class), any(String.class)); // final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class); // when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef); diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index e1a7a07448f1b..b0d25f43bd694 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -53,7 +53,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE") +@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE") public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { private final Logger logger = Loggers.getLogger(RecoveryWhileUnderLoadIT.class); diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index fe83847bff218..48f6fdeaedbbd 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -514,14 +514,6 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down client().admin().indices().prepareRefresh("test").get(); - /* - * We have to execute a second refresh as in the face of relocations, the relocation target is not aware of the in-sync set and so - * the first refresh would bring back the local checkpoint for any shards added to the in-sync set that the relocation target was - * not tracking. - */ - // TODO: remove this after a primary context is transferred during relocation handoff - client().admin().indices().prepareRefresh("test").get(); - } class RecoveryCorruption extends MockTransportService.DelegateTransport {