Skip to content

Commit 1282069

Browse files
committed
Avoid losing ops in file-based recovery
When a primary is relocated from an old node to a new node, it can have ops in its translog that do not have a sequence number assigned. When a file-based recovery is started, this can lead to skipping these ops when replaying the translog due to a bug in the recovery logic. This commit addresses this bug and adds a test in the BWC tests.
1 parent c34b63d commit 1282069

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,10 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna
497497
throw new IndexShardClosedException(request.shardId());
498498
}
499499
cancellableThreads.checkForCancel();
500-
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
501-
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
500+
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
501+
// any ops before the starting sequence number
502+
final long seqNo = operation.seqNo();
503+
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
502504
operations.add(operation);
503505
ops++;
504506
size += operation.estimateSize();

qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@
2727
import org.elasticsearch.cluster.metadata.IndexMetaData;
2828
import org.elasticsearch.common.Strings;
2929
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.xcontent.XContentHelper;
3031
import org.elasticsearch.common.xcontent.XContentType;
32+
import org.elasticsearch.common.xcontent.json.JsonXContent;
3133
import org.elasticsearch.index.IndexSettings;
3234
import org.elasticsearch.index.seqno.SeqNoStats;
3335
import org.elasticsearch.index.seqno.SequenceNumbersService;
3436
import org.elasticsearch.test.rest.ESRestTestCase;
3537
import org.elasticsearch.test.rest.yaml.ObjectPath;
3638

3739
import java.io.IOException;
40+
import java.io.InputStream;
3841
import java.nio.charset.StandardCharsets;
3942
import java.util.ArrayList;
4043
import java.util.Collections;
@@ -109,26 +112,58 @@ public void testSeqNoCheckpoints() throws Exception {
109112
createIndex(index, settings.build());
110113
try (RestClient newNodeClient = buildClient(restClientSettings(),
111114
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
112-
int numDocs = indexDocs(index, 0, randomInt(5));
115+
int numDocs = 0;
116+
final int numberOfInitialDocs = 1 + randomInt(5);
117+
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
118+
numDocs += indexDocs(index, 0, numberOfInitialDocs);
113119
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
114-
115120
logger.info("allowing shards on all nodes");
116121
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
117122
ensureGreen();
118-
logger.info("indexing some more docs");
119-
numDocs += indexDocs(index, numDocs, randomInt(5));
123+
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
124+
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
125+
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
120126
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
121-
logger.info("moving primary to new node");
122127
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
128+
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
123129
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
124130
ensureGreen();
125-
logger.info("indexing some more docs");
126-
int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5));
127-
numDocs += numDocsOnNewPrimary;
131+
int numDocsOnNewPrimary = 0;
132+
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
133+
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
134+
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
135+
numDocs += numberOfDocsAfterMovingPrimary;
136+
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
137+
/*
138+
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
139+
* the recovery code.
140+
*/
141+
logger.info("setting number of replicas to 0");
142+
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
143+
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
144+
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
145+
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
146+
numDocs += numberOfDocsAfterDroppingReplicas;
147+
logger.info("setting number of replicas to 1");
148+
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
149+
ensureGreen();
150+
assertOK(client().performRequest("POST", index + "/_refresh"));
151+
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
152+
assertCount(index, "_primary", numDocs);
153+
assertCount(index, "_replica", numDocs);
128154
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
129155
}
130156
}
131157

158+
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
159+
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
160+
assertOK(response);
161+
final InputStream content = response.getEntity().getContent();
162+
final int actualCount =
163+
Integer.parseInt(XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false).get("count").toString());
164+
assertThat(actualCount, equalTo(expectedCount));
165+
}
166+
132167
private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
133168
assertBusy(() -> {
134169
try {

0 commit comments

Comments
 (0)