Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

Expand All @@ -49,8 +47,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand All @@ -59,7 +57,6 @@ public class ReplicationOperation<
> {
private final Logger logger;
private final Request request;
private final Supplier<ClusterState> clusterStateSupplier;
private final String opType;
private final AtomicInteger totalShards = new AtomicInteger();
/**
Expand All @@ -86,13 +83,12 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, Logger logger, String opType) {
Logger logger, String opType) {
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
this.logger = logger;
this.request = request;
this.clusterStateSupplier = clusterStateSupplier;
this.opType = opType;
}

Expand All @@ -117,51 +113,45 @@ public void execute() throws Exception {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}

// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
ClusterState clusterState = clusterStateSupplier.get();
final List<ShardRouting> shards = getShards(primaryId, clusterState);
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);

markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);

performOnReplicas(replicaRequest, primary.globalCheckpoint(), shards);
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
}

successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
}

private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds, List<ShardRouting> shards) {
if (inSyncAllocationIds.isEmpty() == false && shards.isEmpty() == false) {
Set<String> availableAllocationIds = shards.stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
.collect(Collectors.toSet());

// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : Sets.difference(inSyncAllocationIds, availableAllocationIds)) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
}
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds,
IndexShardRoutingTable indexShardRoutingTable) {
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
// mark copy as stale
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, replicaRequest.primaryTerm(),
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
}
}

private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint, final List<ShardRouting> shards) {
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final IndexShardRoutingTable indexShardRoutingTable) {
final String localNodeId = primary.routingEntry().currentNodeId();
// If the index gets deleted after primary operation, we skip replication
for (final ShardRouting shard : shards) {
for (final ShardRouting shard : indexShardRoutingTable) {
if (shard.unassigned()) {
if (shard.primary() == false) {
totalShards.incrementAndGet();
}
assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard;
totalShards.incrementAndGet();
continue;
}

Expand Down Expand Up @@ -238,23 +228,11 @@ private void onPrimaryDemoted(Exception demotionFailure) {
*/
protected String checkActiveShardCount() {
final ShardId shardId = primary.routingEntry().shardId();
final String indexName = shardId.getIndexName();
final ClusterState state = clusterStateSupplier.get();
assert state != null : "replication operation must have access to the cluster state";
final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
if (waitForActiveShards == ActiveShardCount.NONE) {
return null; // not waiting for any shards
}
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName);
if (indexRoutingTable == null) {
logger.trace("[{}] index not found in the routing table", shardId);
return "Index " + indexName + " not found in the routing table";
}
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable == null) {
logger.trace("[{}] shard not found in the routing table", shardId);
return "Shard " + shardId + " not found in the routing table";
}
final IndexShardRoutingTable shardRoutingTable = primary.getReplicationGroup().getRoutingTable();
if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) {
return null;
} else {
Expand All @@ -268,21 +246,6 @@ protected String checkActiveShardCount() {
}
}

protected Set<String> getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.metaData().index(shardId.getIndex());
if (indexMetaData != null) {
return indexMetaData.inSyncAllocationIds(shardId.id());
}
return Collections.emptySet();
}

protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
// can be null if the index is deleted / closed on us..
final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId);
List<ShardRouting> shards = shardRoutingTable == null ? Collections.emptyList() : shardRoutingTable.shards();
return shards;
}

private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
Expand Down Expand Up @@ -371,6 +334,12 @@ public interface Primary<
*/
long globalCheckpoint();

/**
* Returns the current replication group on the primary shard
*
* @return the replication group
*/
ReplicationGroup getReplicationGroup();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -383,7 +384,7 @@ protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaReq
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
replicasProxy, clusterService::state, logger, actionName);
replicasProxy, logger, actionName);
}
}

Expand Down Expand Up @@ -629,7 +630,7 @@ public void onFailure(Exception e) {
}
}

private IndexShard getIndexShard(ShardId shardId) {
protected IndexShard getIndexShard(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down Expand Up @@ -1006,6 +1007,11 @@ public long globalCheckpoint() {
return indexShard.getGlobalCheckpoint();
}

@Override
public ReplicationGroup getReplicationGroup() {
return indexShard.getReplicationGroup();
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final List<ShardRouting> shards;
final List<ShardRouting> activeShards;
final List<ShardRouting> assignedShards;
final Set<String> allAllocationIds;
static final List<ShardRouting> NO_SHARDS = Collections.emptyList();
final boolean allShardsStarted;

Expand All @@ -84,6 +86,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
List<ShardRouting> activeShards = new ArrayList<>();
List<ShardRouting> assignedShards = new ArrayList<>();
List<ShardRouting> allInitializingShards = new ArrayList<>();
Set<String> allAllocationIds = new HashSet<>();
boolean allShardsStarted = true;
for (ShardRouting shard : shards) {
if (shard.primary()) {
Expand All @@ -100,9 +103,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId());
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
allAllocationIds.add(shard.allocationId().getId());
}
if (shard.state() != ShardRoutingState.STARTED) {
allShardsStarted = false;
Expand All @@ -119,6 +124,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
this.activeShards = Collections.unmodifiableList(activeShards);
this.assignedShards = Collections.unmodifiableList(assignedShards);
this.allInitializingShards = Collections.unmodifiableList(allInitializingShards);
this.allAllocationIds = Collections.unmodifiableSet(allAllocationIds);
}

/**
Expand Down Expand Up @@ -435,6 +441,25 @@ public boolean allShardsStarted() {
return allShardsStarted;
}

@Nullable
public ShardRouting getByAllocationId(String allocationId) {
for (ShardRouting shardRouting : assignedShards()) {
if (shardRouting.allocationId().getId().equals(allocationId)) {
return shardRouting;
}
if (shardRouting.relocating()) {
if (shardRouting.getTargetRelocatingShard().allocationId().getId().equals(allocationId)) {
return shardRouting.getTargetRelocatingShard();
}
}
}
return null;
}

public Set<String> getAllAllocationIds() {
return allAllocationIds;
}

static class AttributesKey {

final String[] attributes;
Expand Down Expand Up @@ -634,7 +659,7 @@ public static IndexShardRoutingTable readFromThin(StreamInput in, Index index) t
}

public static void writeTo(IndexShardRoutingTable indexShard, StreamOutput out) throws IOException {
out.writeString(indexShard.shardId().getIndex().getName());
indexShard.shardId().getIndex().writeTo(out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a serialization-breaking change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was apparently never used before

writeToThin(indexShard, out);
}

Expand All @@ -648,4 +673,19 @@ public static void writeToThin(IndexShardRoutingTable indexShard, StreamOutput o
}

}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("IndexShardRoutingTable(").append(shardId()).append("){");
final int numShards = shards.size();
for (int i = 0; i < numShards; i++) {
sb.append(shards.get(i).shortSummary());
if (i < numShards - 1) {
sb.append(", ");
}
}
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,7 @@ public ShardRouting getByAllocationId(ShardId shardId, String allocationId) {
if (shardRoutingTable == null) {
return null;
}
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
if (shardRouting.allocationId().getId().equals(allocationId)) {
return shardRouting;
}
if (shardRouting.relocating()) {
if (shardRouting.getTargetRelocatingShard().allocationId().getId().equals(allocationId)) {
return shardRouting.getTargetRelocatingShard();
}
}
}
return null;
return shardRoutingTable.getByAllocationId(allocationId);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public final class ShardRouting implements Writeable, ToXContent {
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null) : "recovery source only available on unassigned or initializing shard but was " + state;
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary : "replica shards always recover from primary";
assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node " + this;
}

@Nullable
Expand Down
Loading