@@ -67,7 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
6767 private volatile int numConcurrentWrites = 0 ;
6868 private volatile long followerGlobalCheckpoint = 0 ;
6969 private volatile long currentIndexMetadataVersion = 0 ;
70- private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ). reversed () );
70+ private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ));
7171
7272 ShardFollowNodeTask (long id , String type , String action , String description , TaskId parentTask , Map <String , String > headers ,
7373 ShardFollowTask params , BiConsumer <TimeValue , Runnable > scheduler ) {
@@ -78,10 +78,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7878 this .idleShardChangesRequestDelay = params .getIdleShardRetryDelay ();
7979 }
8080
81- void start (long followerGlobalCheckpoint ) {
81+ void start (long leaderGlobalCheckpoint , long followerGlobalCheckpoint ) {
8282 this .lastRequestedSeqno = followerGlobalCheckpoint ;
8383 this .followerGlobalCheckpoint = followerGlobalCheckpoint ;
84- this .leaderGlobalCheckpoint = followerGlobalCheckpoint ;
84+ this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
8585
8686 // Forcefully updates follower mapping, this gets us the leader imd version and
8787 // makes sure that leader and follower mapping are identical.
@@ -93,7 +93,7 @@ void start(long followerGlobalCheckpoint) {
9393 });
9494 }
9595
96- private synchronized void coordinateReads () {
96+ synchronized void coordinateReads () {
9797 if (isStopped ()) {
9898 LOGGER .info ("{} shard follow task has been stopped" , params .getFollowShardId ());
9999 return ;
@@ -105,7 +105,8 @@ private synchronized void coordinateReads() {
105105 while (hasReadBudget () && lastRequestedSeqno < leaderGlobalCheckpoint ) {
106106 numConcurrentReads ++;
107107 long from = lastRequestedSeqno + 1 ;
108- long maxRequiredSeqno = Math .min (leaderGlobalCheckpoint , from + maxBatchOperationCount );
108+ // -1 is needed, because maxRequiredSeqno is inclusive
109+ long maxRequiredSeqno = Math .min (leaderGlobalCheckpoint , (from + maxBatchOperationCount ) - 1 );
109110 LOGGER .trace ("{}[{}] read [{}/{}]" , params .getFollowShardId (), numConcurrentReads , maxRequiredSeqno , maxBatchOperationCount );
110111 sendShardChangesRequest (from , maxBatchOperationCount , maxRequiredSeqno );
111112 lastRequestedSeqno = maxRequiredSeqno ;
@@ -137,6 +138,11 @@ private boolean hasReadBudget() {
137138 }
138139
139140 private synchronized void coordinateWrites () {
141+ if (isStopped ()) {
142+ LOGGER .info ("{} shard follow task has been stopped" , params .getFollowShardId ());
143+ return ;
144+ }
145+
140146 while (hasWriteBudget () && buffer .isEmpty () == false ) {
141147 long sumEstimatedSize = 0L ;
142148 int length = Math .min (params .getMaxBatchOperationCount (), buffer .size ());
@@ -176,48 +182,48 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
176182 e -> handleFailure (e , retryCounter , () -> sendShardChangesRequest (from , maxOperationCount , maxRequiredSeqNo , retryCounter )));
177183 }
178184
179- private void handleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
180- maybeUpdateMapping (response .getIndexMetadataVersion (), () -> {
181- synchronized (ShardFollowNodeTask .this ) {
182- leaderGlobalCheckpoint = Math .max (leaderGlobalCheckpoint , response .getGlobalCheckpoint ());
183- final long newMinRequiredSeqNo ;
184- if (response .getOperations ().length == 0 ) {
185- newMinRequiredSeqNo = from ;
186- } else {
187- assert response .getOperations ()[0 ].seqNo () == from :
188- "first operation is not what we asked for. From is [" + from + "], got " + response .getOperations ()[0 ];
189- buffer .addAll (Arrays .asList (response .getOperations ()));
190- final long maxSeqNo = response .getOperations ()[response .getOperations ().length - 1 ].seqNo ();
191- assert maxSeqNo ==
192- Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::seqNo ).max ().getAsLong ();
193- newMinRequiredSeqNo = maxSeqNo + 1 ;
194- // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
195- lastRequestedSeqno = Math .max (lastRequestedSeqno , maxSeqNo );
196- assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
197- "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]" ;
198- coordinateWrites ();
199- }
185+ void handleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
186+ maybeUpdateMapping (response .getIndexMetadataVersion (), () -> innerHandleReadResponse (from , maxRequiredSeqNo , response ));
187+ }
200188
201- if (newMinRequiredSeqNo < maxRequiredSeqNo ) {
202- int newSize = (int ) (maxRequiredSeqNo - newMinRequiredSeqNo ) + 1 ;
203- LOGGER .trace ("{} received [{}] ops, still missing [{}/{}], continuing to read..." ,
204- params .getFollowShardId (), response .getOperations ().length , newMinRequiredSeqNo , maxRequiredSeqNo );
205- sendShardChangesRequest (newMinRequiredSeqNo , newSize , maxRequiredSeqNo );
206- } else {
207- // read is completed, decrement
208- numConcurrentReads --;
209- if (response .getOperations ().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno ) {
210- // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
211- // future requests
212- LOGGER .trace ("{} received no ops and no known ops to fetch, scheduling to coordinate reads" ,
213- params .getFollowShardId ());
214- scheduler .accept (idleShardChangesRequestDelay , this ::coordinateReads );
215- } else {
216- coordinateReads ();
217- }
218- }
189+ synchronized void innerHandleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
190+ leaderGlobalCheckpoint = Math .max (leaderGlobalCheckpoint , response .getGlobalCheckpoint ());
191+ final long newMinRequiredSeqNo ;
192+ if (response .getOperations ().length == 0 ) {
193+ newMinRequiredSeqNo = from ;
194+ } else {
195+ assert response .getOperations ()[0 ].seqNo () == from :
196+ "first operation is not what we asked for. From is [" + from + "], got " + response .getOperations ()[0 ];
197+ buffer .addAll (Arrays .asList (response .getOperations ()));
198+ final long maxSeqNo = response .getOperations ()[response .getOperations ().length - 1 ].seqNo ();
199+ assert maxSeqNo ==
200+ Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::seqNo ).max ().getAsLong ();
201+ newMinRequiredSeqNo = maxSeqNo + 1 ;
202+ // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
203+ lastRequestedSeqno = Math .max (lastRequestedSeqno , maxSeqNo );
204+ assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
205+ "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]" ;
206+ coordinateWrites ();
207+ }
208+
209+ if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped () == false ) {
210+ int newSize = (int ) (maxRequiredSeqNo - newMinRequiredSeqNo ) + 1 ;
211+ LOGGER .trace ("{} received [{}] ops, still missing [{}/{}], continuing to read..." ,
212+ params .getFollowShardId (), response .getOperations ().length , newMinRequiredSeqNo , maxRequiredSeqNo );
213+ sendShardChangesRequest (newMinRequiredSeqNo , newSize , maxRequiredSeqNo );
214+ } else {
215+ // read is completed, decrement
216+ numConcurrentReads --;
217+ if (response .getOperations ().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno ) {
218+ // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
219+ // future requests
220+ LOGGER .trace ("{} received no ops and no known ops to fetch, scheduling to coordinate reads" ,
221+ params .getFollowShardId ());
222+ scheduler .accept (idleShardChangesRequestDelay , this ::coordinateReads );
223+ } else {
224+ coordinateReads ();
219225 }
220- });
226+ }
221227 }
222228
223229 private void sendBulkShardOperationsRequest (List <Translog .Operation > operations ) {
@@ -306,7 +312,8 @@ protected boolean isStopped() {
306312
307313 @ Override
308314 public Status getStatus () {
309- return new Status (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numConcurrentReads , numConcurrentWrites );
315+ return new Status (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numConcurrentReads , numConcurrentWrites ,
316+ currentIndexMetadataVersion );
310317 }
311318
312319 public static class Status implements Task .Status {
@@ -318,31 +325,35 @@ public static class Status implements Task.Status {
318325 static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField ("last_requested_seqno" );
319326 static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField ("number_of_concurrent_reads" );
320327 static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField ("number_of_concurrent_writes" );
328+ static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField ("index_metadata_version" );
321329
322330 static final ConstructingObjectParser <Status , Void > PARSER = new ConstructingObjectParser <>(NAME ,
323- args -> new Status ((long ) args [0 ], (long ) args [1 ], (long ) args [2 ], (int ) args [3 ], (int ) args [4 ]));
331+ args -> new Status ((long ) args [0 ], (long ) args [1 ], (long ) args [2 ], (int ) args [3 ], (int ) args [4 ], ( long ) args [ 5 ] ));
324332
325333 static {
326334 PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
327335 PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQNO_FIELD );
328336 PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
329337 PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
330338 PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
339+ PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
331340 }
332341
333342 private final long leaderGlobalCheckpoint ;
334343 private final long lastRequestedSeqno ;
335344 private final long followerGlobalCheckpoint ;
336345 private final int numberOfConcurrentReads ;
337346 private final int numberOfConcurrentWrites ;
347+ private final long indexMetadataVersion ;
338348
339349 Status (long leaderGlobalCheckpoint , long lastRequestedSeqno , long followerGlobalCheckpoint ,
340- int numberOfConcurrentReads , int numberOfConcurrentWrites ) {
350+ int numberOfConcurrentReads , int numberOfConcurrentWrites , long indexMetadataVersion ) {
341351 this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
342352 this .lastRequestedSeqno = lastRequestedSeqno ;
343353 this .followerGlobalCheckpoint = followerGlobalCheckpoint ;
344354 this .numberOfConcurrentReads = numberOfConcurrentReads ;
345355 this .numberOfConcurrentWrites = numberOfConcurrentWrites ;
356+ this .indexMetadataVersion = indexMetadataVersion ;
346357 }
347358
348359 public Status (StreamInput in ) throws IOException {
@@ -351,6 +362,7 @@ public Status(StreamInput in) throws IOException {
351362 this .followerGlobalCheckpoint = in .readZLong ();
352363 this .numberOfConcurrentReads = in .readVInt ();
353364 this .numberOfConcurrentWrites = in .readVInt ();
365+ this .indexMetadataVersion = in .readVLong ();
354366 }
355367
356368 public long getLeaderGlobalCheckpoint () {
@@ -373,6 +385,10 @@ public int getNumberOfConcurrentWrites() {
373385 return numberOfConcurrentWrites ;
374386 }
375387
388+ public long getIndexMetadataVersion () {
389+ return indexMetadataVersion ;
390+ }
391+
376392 @ Override
377393 public String getWriteableName () {
378394 return NAME ;
@@ -385,6 +401,7 @@ public void writeTo(StreamOutput out) throws IOException {
385401 out .writeZLong (followerGlobalCheckpoint );
386402 out .writeVInt (numberOfConcurrentReads );
387403 out .writeVInt (numberOfConcurrentWrites );
404+ out .writeVLong (indexMetadataVersion );
388405 }
389406
390407 @ Override
@@ -396,6 +413,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
396413 builder .field (LAST_REQUESTED_SEQNO_FIELD .getPreferredName (), lastRequestedSeqno );
397414 builder .field (NUMBER_OF_CONCURRENT_READS_FIELD .getPreferredName (), numberOfConcurrentReads );
398415 builder .field (NUMBER_OF_CONCURRENT_WRITES_FIELD .getPreferredName (), numberOfConcurrentWrites );
416+ builder .field (INDEX_METADATA_VERSION_FIELD .getPreferredName (), indexMetadataVersion );
399417 }
400418 builder .endObject ();
401419 return builder ;
@@ -414,13 +432,14 @@ public boolean equals(Object o) {
414432 lastRequestedSeqno == status .lastRequestedSeqno &&
415433 followerGlobalCheckpoint == status .followerGlobalCheckpoint &&
416434 numberOfConcurrentReads == status .numberOfConcurrentReads &&
417- numberOfConcurrentWrites == status .numberOfConcurrentWrites ;
435+ numberOfConcurrentWrites == status .numberOfConcurrentWrites &&
436+ indexMetadataVersion == status .indexMetadataVersion ;
418437 }
419438
420439 @ Override
421440 public int hashCode () {
422441 return Objects .hash (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numberOfConcurrentReads ,
423- numberOfConcurrentWrites );
442+ numberOfConcurrentWrites , indexMetadataVersion );
424443 }
425444
426445 public String toString () {
0 commit comments