Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void execute() throws Exception {
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher as its local checkpoint.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,15 @@ public static void writeToThin(IndexShardRoutingTable indexShard, StreamOutput o
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("shard_id [").append(shardId().getIndex().getName()).append("][").append(shardId().id()).append("]\n");
for (ShardRouting shard : this) {
sb.append("--").append(shard.shortSummary()).append("\n");
sb.append("IndexShardRoutingTable(").append(shardId()).append("){");
final int numShards = shards.size();
for (int i = 0; i < numShards; i++) {
sb.append(shards.get(i).shortSummary());
if (i < numShards - 1) {
sb.append(", ");
}
}
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* - computed based on local checkpoints, if the tracker is in primary mode
* - received from the primary, if the tracker is in replica mode
*/
long globalCheckpoint;
volatile long globalCheckpoint;

/**
* Cached value for the last replication group that was computed
*/
private ReplicationGroup cachedReplicationGroup;
volatile ReplicationGroup replicationGroup;

public static class LocalCheckpointState implements Writeable {

Expand Down Expand Up @@ -204,6 +204,9 @@ private boolean invariant() {
// there is at least one in-sync shard copy when the global checkpoint tracker operates in primary mode (i.e. the shard itself)
assert !primaryMode || localCheckpoints.values().stream().anyMatch(lcps -> lcps.inSync);

// the routing table and replication group is set when the global checkpoint tracker operates in primary mode
assert !primaryMode || (routingTable != null && replicationGroup != null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message?


// during relocation handoff there are no entries blocking global checkpoint advancement
assert !handoffInProgress || pendingInSync.isEmpty() :
"entries blocking global checkpoint advancement during relocation handoff: " + pendingInSync;
Expand All @@ -216,8 +219,12 @@ private boolean invariant() {
"global checkpoint is not up-to-date, expected: " +
computeGlobalCheckpoint(pendingInSync, localCheckpoints.values(), globalCheckpoint) + " but was: " + globalCheckpoint;

assert cachedReplicationGroup == null || cachedReplicationGroup.equals(calculateReplicationGroup()) :
"cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + cachedReplicationGroup;
// we have a routing table iff we have a replication group
assert (routingTable == null) == (replicationGroup == null) :
"routing table is " + routingTable + " but replication group is " + replicationGroup;

assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup()) :
"cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup;

// all assigned shards from the routing table are tracked
assert routingTable == null || localCheckpoints.keySet().containsAll(routingTable.getAllAllocationIds()) :
Expand Down Expand Up @@ -249,7 +256,8 @@ private boolean invariant() {
this.globalCheckpoint = globalCheckpoint;
this.localCheckpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
this.pendingInSync = new HashSet<>();
this.cachedReplicationGroup = null;
this.routingTable = null;
this.replicationGroup = null;
assert invariant();
}

Expand All @@ -258,29 +266,22 @@ private boolean invariant() {
*
* @return the replication group
*/
public synchronized ReplicationGroup getReplicationGroup() {
public ReplicationGroup getReplicationGroup() {
assert primaryMode;
if (cachedReplicationGroup == null) {
cachedReplicationGroup = calculateReplicationGroup();
}
return cachedReplicationGroup;
return replicationGroup;
}

private synchronized ReplicationGroup calculateReplicationGroup() {
private ReplicationGroup calculateReplicationGroup() {
return new ReplicationGroup(routingTable,
localCheckpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()));
}

private synchronized void invalidateReplicationGroupCache() {
cachedReplicationGroup = null;
}

/**
* Returns the global checkpoint for the shard.
*
* @return the global checkpoint
*/
public synchronized long getGlobalCheckpoint() {
public long getGlobalCheckpoint() {
return globalCheckpoint;
}

Expand Down Expand Up @@ -370,7 +371,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
}
appliedClusterStateVersion = applyingClusterStateVersion;
this.routingTable = routingTable;
invalidateReplicationGroupCache();
replicationGroup = calculateReplicationGroup();
if (primaryMode && removedEntries) {
updateGlobalCheckpointOnPrimary();
}
Expand Down Expand Up @@ -435,7 +436,7 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin
}
} else {
lcps.inSync = true;
invalidateReplicationGroupCache();
replicationGroup = calculateReplicationGroup();
logger.trace("marked [{}] as in-sync", allocationId);
updateGlobalCheckpointOnPrimary();
}
Expand Down Expand Up @@ -481,7 +482,7 @@ public synchronized void updateLocalCheckpoint(final String allocationId, final
pendingInSync.remove(allocationId);
pending = false;
lcps.inSync = true;
invalidateReplicationGroupCache();
replicationGroup = calculateReplicationGroup();
logger.trace("marked [{}] as in-sync", allocationId);
notifyAllWaiters();
}
Expand Down Expand Up @@ -601,6 +602,7 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex
localCheckpoints.put(entry.getKey(), entry.getValue().copy());
}
routingTable = primaryContext.getRoutingTable();
replicationGroup = calculateReplicationGroup();
updateGlobalCheckpointOnPrimary();
// reapply missed cluster state update
// note that if there was no cluster state update between start of the engine of this shard and the call to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,16 @@ public StartRecoveryRequest getRequest() {
* performs the recovery from the local engine to the target
*/
public RecoveryResponse recoverToTarget() throws IOException {
cancellableThreads.execute(() -> runUnderOperationPermit(() -> {
runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
if (targetShardRouting.initializing() == false) {
logger.debug("delaying recovery of {} as it is not listed as initializing on the source node {}. " +
"known shards state is [{}]", request.shardId(), request.sourceNode(), targetShardRouting.state());
throw new DelayRecoveryException("source node has the state of the target shard to be [" +
targetShardRouting.state() + "], expecting to be [initializing]");
}
}));
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
});

try (Translog.View translogView = shard.acquireTranslogView()) {

Expand Down Expand Up @@ -179,7 +174,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
}

cancellableThreads.execute(() -> runUnderOperationPermit(() -> shard.initiateTracking(request.targetAllocationId())));
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

try {
prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo));
Expand All @@ -200,15 +195,19 @@ public RecoveryResponse recoverToTarget() throws IOException {
return response;
}

private void runUnderOperationPermit(CancellableThreads.Interruptable runnable) throws InterruptedException {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
try (Releasable ignored = onAcquired.actionGet()) {
if (shard.state() == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shard.shardId());
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
try (Releasable ignored = onAcquired.actionGet()) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will change its state to RELOCATED only when it holds all operation permits, see IndexShard.relocated()
if (shard.state() == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shard.shardId());
}
runnable.run();
}
runnable.run();
}
});
}

/**
Expand Down Expand Up @@ -461,19 +460,18 @@ public void finalizeRecovery(final long targetLocalCheckpoint) {
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderOperationPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
cancellableThreads.execute(() -> recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()));

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here say that this will acquire all permits and will thus will delay new recoveries until it's done?

/*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,12 @@ public void testUpdateAllocationIdsFromMaster() throws Exception {
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable(initializingIds), emptySet());
IndexShardRoutingTable routingTable = routingTable(initializingIds);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
AllocationId primaryId = activeAllocationIds.iterator().next();
tracker.activatePrimaryMode(primaryId.getId(), NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));

// first we assert that the in-sync and tracking sets are set up correctly
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
Expand All @@ -400,18 +403,22 @@ public void testUpdateAllocationIdsFromMaster() throws Exception {
== SequenceNumbersService.UNASSIGNED_SEQ_NO));

// now we will remove some allocation IDs from these and ensure that they propagate through
final List<AllocationId> removingActiveAllocationIds = randomSubsetOf(activeAllocationIds);
final Set<AllocationId> removingActiveAllocationIds = new HashSet<>(randomSubsetOf(activeAllocationIds));
final Set<AllocationId> newActiveAllocationIds =
activeAllocationIds.stream().filter(a -> !removingActiveAllocationIds.contains(a)).collect(Collectors.toSet());
final List<AllocationId> removingInitializingAllocationIds = randomSubsetOf(initializingIds);
final Set<AllocationId> newInitializingAllocationIds =
initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet());
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(newActiveAllocationIds), routingTable(newInitializingAllocationIds),
routingTable = routingTable(newInitializingAllocationIds);
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(newActiveAllocationIds), routingTable,
emptySet());
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
assertTrue(removingActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()) == null));
assertTrue(newInitializingAllocationIds.stream().noneMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
assertTrue(removingInitializingAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()) == null));
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(
ids(Sets.difference(Sets.union(activeAllocationIds, newActiveAllocationIds), removingActiveAllocationIds))));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));

/*
* Now we will add an allocation ID to each of active and initializing and ensure they propagate through. Using different lengths
Expand Down Expand Up @@ -670,6 +677,7 @@ public void testPrimaryContextHandoff() throws IOException {
assertThat(newPrimary.localCheckpoints, equalTo(oldPrimary.localCheckpoints));
assertThat(newPrimary.globalCheckpoint, equalTo(oldPrimary.globalCheckpoint));
assertThat(newPrimary.routingTable, equalTo(oldPrimary.routingTable));
assertThat(newPrimary.replicationGroup, equalTo(oldPrimary.replicationGroup));

oldPrimary.completeRelocationHandoff();
assertFalse(oldPrimary.primaryMode);
Expand Down