2727import org .elasticsearch .action .UnavailableShardsException ;
2828import org .elasticsearch .action .support .ActiveShardCount ;
2929import org .elasticsearch .action .support .TransportActions ;
30- import org .elasticsearch .cluster .ClusterState ;
31- import org .elasticsearch .cluster .metadata .IndexMetaData ;
3230import org .elasticsearch .cluster .routing .AllocationId ;
33- import org .elasticsearch .cluster .routing .IndexRoutingTable ;
3431import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3532import org .elasticsearch .cluster .routing .ShardRouting ;
3633import org .elasticsearch .common .Nullable ;
3734import org .elasticsearch .common .io .stream .StreamInput ;
3835import org .elasticsearch .common .util .set .Sets ;
36+ import org .elasticsearch .index .shard .ReplicationGroup ;
3937import org .elasticsearch .index .shard .ShardId ;
4038import org .elasticsearch .rest .RestStatus ;
4139
4947import java .util .concurrent .atomic .AtomicBoolean ;
5048import java .util .concurrent .atomic .AtomicInteger ;
5149import java .util .function .Consumer ;
52- import java .util .function .Supplier ;
5350import java .util .stream .Collectors ;
51+ import java .util .stream .Stream ;
5452
5553public class ReplicationOperation <
5654 Request extends ReplicationRequest <Request >,
@@ -59,7 +57,6 @@ public class ReplicationOperation<
5957 > {
6058 private final Logger logger ;
6159 private final Request request ;
62- private final Supplier <ClusterState > clusterStateSupplier ;
6360 private final String opType ;
6461 private final AtomicInteger totalShards = new AtomicInteger ();
6562 /**
@@ -86,13 +83,12 @@ public class ReplicationOperation<
8683 public ReplicationOperation (Request request , Primary <Request , ReplicaRequest , PrimaryResultT > primary ,
8784 ActionListener <PrimaryResultT > listener ,
8885 Replicas <ReplicaRequest > replicas ,
89- Supplier < ClusterState > clusterStateSupplier , Logger logger , String opType ) {
86+ Logger logger , String opType ) {
9087 this .replicasProxy = replicas ;
9188 this .primary = primary ;
9289 this .resultListener = listener ;
9390 this .logger = logger ;
9491 this .request = request ;
95- this .clusterStateSupplier = clusterStateSupplier ;
9692 this .opType = opType ;
9793 }
9894
@@ -117,51 +113,45 @@ public void execute() throws Exception {
117113 logger .trace ("[{}] op [{}] completed on primary for request [{}]" , primaryId , opType , request );
118114 }
119115
120- // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
116+ // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
121117 // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
122- // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
123- ClusterState clusterState = clusterStateSupplier .get ();
124- final List <ShardRouting > shards = getShards (primaryId , clusterState );
125- Set <String > inSyncAllocationIds = getInSyncAllocationIds (primaryId , clusterState );
126-
127- markUnavailableShardsAsStale (replicaRequest , inSyncAllocationIds , shards );
128-
129- performOnReplicas (replicaRequest , primary .globalCheckpoint (), shards );
118+ // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
119+ // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
120+ // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
121+ // of the sampled replication group, and advanced further than what the given replication group would allow it to.
122+ // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
123+ final long globalCheckpoint = primary .globalCheckpoint ();
124+ final ReplicationGroup replicationGroup = primary .getReplicationGroup ();
125+ markUnavailableShardsAsStale (replicaRequest , replicationGroup .getInSyncAllocationIds (), replicationGroup .getRoutingTable ());
126+ performOnReplicas (replicaRequest , globalCheckpoint , replicationGroup .getRoutingTable ());
130127 }
131128
132129 successfulShards .incrementAndGet (); // mark primary as successful
133130 decPendingAndFinishIfNeeded ();
134131 }
135132
136- private void markUnavailableShardsAsStale (ReplicaRequest replicaRequest , Set <String > inSyncAllocationIds , List <ShardRouting > shards ) {
137- if (inSyncAllocationIds .isEmpty () == false && shards .isEmpty () == false ) {
138- Set <String > availableAllocationIds = shards .stream ()
139- .map (ShardRouting ::allocationId )
140- .filter (Objects ::nonNull )
141- .map (AllocationId ::getId )
142- .collect (Collectors .toSet ());
143-
144- // if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
145- for (String allocationId : Sets .difference (inSyncAllocationIds , availableAllocationIds )) {
146- // mark copy as stale
147- pendingActions .incrementAndGet ();
148- replicasProxy .markShardCopyAsStaleIfNeeded (replicaRequest .shardId (), allocationId , replicaRequest .primaryTerm (),
149- ReplicationOperation .this ::decPendingAndFinishIfNeeded ,
150- ReplicationOperation .this ::onPrimaryDemoted ,
151- throwable -> decPendingAndFinishIfNeeded ()
152- );
153- }
133+ private void markUnavailableShardsAsStale (ReplicaRequest replicaRequest , Set <String > inSyncAllocationIds ,
134+ IndexShardRoutingTable indexShardRoutingTable ) {
135+ // if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
136+ for (String allocationId : Sets .difference (inSyncAllocationIds , indexShardRoutingTable .getAllAllocationIds ())) {
137+ // mark copy as stale
138+ pendingActions .incrementAndGet ();
139+ replicasProxy .markShardCopyAsStaleIfNeeded (replicaRequest .shardId (), allocationId , replicaRequest .primaryTerm (),
140+ ReplicationOperation .this ::decPendingAndFinishIfNeeded ,
141+ ReplicationOperation .this ::onPrimaryDemoted ,
142+ throwable -> decPendingAndFinishIfNeeded ()
143+ );
154144 }
155145 }
156146
157- private void performOnReplicas (final ReplicaRequest replicaRequest , final long globalCheckpoint , final List <ShardRouting > shards ) {
147+ private void performOnReplicas (final ReplicaRequest replicaRequest , final long globalCheckpoint ,
148+ final IndexShardRoutingTable indexShardRoutingTable ) {
158149 final String localNodeId = primary .routingEntry ().currentNodeId ();
159150 // If the index gets deleted after primary operation, we skip replication
160- for (final ShardRouting shard : shards ) {
151+ for (final ShardRouting shard : indexShardRoutingTable ) {
161152 if (shard .unassigned ()) {
162- if (shard .primary () == false ) {
163- totalShards .incrementAndGet ();
164- }
153+ assert shard .primary () == false : "primary shard should not be unassigned in a replication group: " + shard ;
154+ totalShards .incrementAndGet ();
165155 continue ;
166156 }
167157
@@ -238,23 +228,11 @@ private void onPrimaryDemoted(Exception demotionFailure) {
238228 */
239229 protected String checkActiveShardCount () {
240230 final ShardId shardId = primary .routingEntry ().shardId ();
241- final String indexName = shardId .getIndexName ();
242- final ClusterState state = clusterStateSupplier .get ();
243- assert state != null : "replication operation must have access to the cluster state" ;
244231 final ActiveShardCount waitForActiveShards = request .waitForActiveShards ();
245232 if (waitForActiveShards == ActiveShardCount .NONE ) {
246233 return null ; // not waiting for any shards
247234 }
248- IndexRoutingTable indexRoutingTable = state .getRoutingTable ().index (indexName );
249- if (indexRoutingTable == null ) {
250- logger .trace ("[{}] index not found in the routing table" , shardId );
251- return "Index " + indexName + " not found in the routing table" ;
252- }
253- IndexShardRoutingTable shardRoutingTable = indexRoutingTable .shard (shardId .getId ());
254- if (shardRoutingTable == null ) {
255- logger .trace ("[{}] shard not found in the routing table" , shardId );
256- return "Shard " + shardId + " not found in the routing table" ;
257- }
235+ final IndexShardRoutingTable shardRoutingTable = primary .getReplicationGroup ().getRoutingTable ();
258236 if (waitForActiveShards .enoughShardsActive (shardRoutingTable )) {
259237 return null ;
260238 } else {
@@ -268,21 +246,6 @@ protected String checkActiveShardCount() {
268246 }
269247 }
270248
271- protected Set <String > getInSyncAllocationIds (ShardId shardId , ClusterState clusterState ) {
272- IndexMetaData indexMetaData = clusterState .metaData ().index (shardId .getIndex ());
273- if (indexMetaData != null ) {
274- return indexMetaData .inSyncAllocationIds (shardId .id ());
275- }
276- return Collections .emptySet ();
277- }
278-
279- protected List <ShardRouting > getShards (ShardId shardId , ClusterState state ) {
280- // can be null if the index is deleted / closed on us..
281- final IndexShardRoutingTable shardRoutingTable = state .getRoutingTable ().shardRoutingTableOrNull (shardId );
282- List <ShardRouting > shards = shardRoutingTable == null ? Collections .emptyList () : shardRoutingTable .shards ();
283- return shards ;
284- }
285-
286249 private void decPendingAndFinishIfNeeded () {
287250 assert pendingActions .get () > 0 : "pending action count goes below 0 for request [" + request + "]" ;
288251 if (pendingActions .decrementAndGet () == 0 ) {
@@ -371,6 +334,12 @@ public interface Primary<
371334 */
372335 long globalCheckpoint ();
373336
337+ /**
338+ * Returns the current replication group on the primary shard
339+ *
340+ * @return the replication group
341+ */
342+ ReplicationGroup getReplicationGroup ();
374343 }
375344
376345 /**
0 commit comments