Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,10 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
// any ops before the starting sequence number
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
operations.add(operation);
ops++;
size += operation.estimateSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -109,26 +112,58 @@ public void testSeqNoCheckpoints() throws Exception {
createIndex(index, settings.build());
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
int numDocs = indexDocs(index, 0, randomInt(5));
int numDocs = 0;
final int numberOfInitialDocs = 1 + randomInt(5);
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
numDocs += indexDocs(index, 0, numberOfInitialDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);

logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add assert counts here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 0112f90.

logger.info("indexing some more docs");
numDocs += indexDocs(index, numDocs, randomInt(5));
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
logger.info("moving primary to new node");
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen();
logger.info("indexing some more docs");
int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5));
numDocs += numDocsOnNewPrimary;
int numDocsOnNewPrimary = 0;
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
numDocs += numberOfDocsAfterMovingPrimary;
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
/*
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
* the recovery code.
*/
logger.info("setting number of replicas to 0");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
numDocs += numberOfDocsAfterDroppingReplicas;
logger.info("setting number of replicas to 1");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
assertCount(index, "_primary", numDocs);
assertCount(index, "_replica", numDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
}
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
assertOK(response);
final InputStream content = response.getEntity().getContent();
final int actualCount =
Integer.parseInt(XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false).get("count").toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance to use something like ObjectPath.evaluate(shard, "seq_no.local_checkpoint")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 4430caa.

assertThat(actualCount, equalTo(expectedCount));
}

private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
assertBusy(() -> {
try {
Expand Down