Skip to content

Commit 8960522

Browse files
committed
Add assertion on number of recovered ops
This commit adds an assertion to the number of ops recovered from the translog in the recovery of disconnected replica test.
1 parent 81a1e1c commit 8960522

File tree

3 files changed

+16
-9
lines changed

3 files changed

+16
-9
lines changed

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl
218218
}
219219
return tracker.getCheckpoint() >= endingSeqNo;
220220
}
221-
return false;
221+
return true;
222222
}
223223

224224
/**
@@ -417,7 +417,7 @@ void phase2(final Translog.Snapshot snapshot) throws IOException {
417417
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
418418

419419
// send all the snapshot's translog operations to the target
420-
final int totalOperations = sendSnapshot(snapshot);
420+
final int totalOperations = sendSnapshot(request.startingSeqNo(), snapshot);
421421

422422
stopWatch.stop();
423423
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
@@ -465,15 +465,17 @@ public void finalizeRecovery() {
465465
}
466466

467467
/**
468-
* Send the given snapshot's operations to this handler's target node.
468+
* Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's
469+
* target node.
469470
* <p>
470471
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
471472
*
472-
* @param snapshot the translog snapshot to replay operations from
473+
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
474+
* @param snapshot the translog snapshot to replay operations from
473475
* @return the total number of translog operations that were sent
474476
* @throws IOException if an I/O exception occurred reading the translog snapshot
475477
*/
476-
protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException {
478+
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException {
477479
int ops = 0;
478480
long size = 0;
479481
int totalOperations = 0;
@@ -490,6 +492,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) throws IOException
490492
throw new IndexShardClosedException(request.shardId());
491493
}
492494
cancellableThreads.checkForCancel();
495+
if (operation.seqNo() < startingSeqNo) continue;
493496
operations.add(operation);
494497
ops++;
495498
size += operation.estimateSize();

core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
8383
}
8484

8585
@Override
86-
protected int sendSnapshot(final Translog.Snapshot snapshot) {
86+
protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) {
8787
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", shard.shardId(), request.targetNode());
8888
return 0;
8989
}

core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.Future;
3737

3838
import static org.hamcrest.Matchers.empty;
39+
import static org.hamcrest.Matchers.equalTo;
3940
import static org.hamcrest.Matchers.not;
4041

4142
public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestCase {
@@ -68,6 +69,7 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
6869
shards.flush();
6970
shards.getPrimary().updateGlobalCheckpointOnPrimary();
7071
final IndexShard originalReplica = shards.getReplicas().get(0);
72+
long replicaCommittedLocalCheckpoint = docs - 1;
7173
boolean replicaHasDocsSinceLastFlushedCheckpoint = false;
7274
for (int i = 0; i < randomInt(2); i++) {
7375
final int indexedDocs = shards.indexDocs(randomInt(5));
@@ -79,14 +81,13 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
7981
final boolean flush = randomBoolean();
8082
if (flush) {
8183
originalReplica.flush(new FlushRequest());
84+
replicaHasDocsSinceLastFlushedCheckpoint = false;
85+
replicaCommittedLocalCheckpoint = docs - 1;
8286
}
8387

8488
final boolean sync = randomBoolean();
8589
if (sync) {
8690
shards.getPrimary().updateGlobalCheckpointOnPrimary();
87-
if (flush) {
88-
replicaHasDocsSinceLastFlushedCheckpoint = false;
89-
}
9091
}
9192
}
9293

@@ -112,6 +113,9 @@ public void testRecoveryOfDisconnectedReplica() throws Exception {
112113
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty()));
113114
} else {
114115
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
116+
assertThat(
117+
recoveredReplica.recoveryState().getTranslog().recoveredOperations(),
118+
equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1))));
115119
}
116120

117121
docs += shards.indexDocs(randomInt(5));

0 commit comments

Comments
 (0)