Skip to content

Commit 6e99402

Browse files
authored
Avoid losing ops in file-based recovery
When a primary is relocated from an old node to a new node, it can have ops in its translog that do not have a sequence number assigned. When a file-based recovery is started, this can lead to skipping these ops when replaying the translog due to a bug in the recovery logic. This commit addresses this bug and adds a test in the BWC tests. Relates #22945
1 parent fb8bdbc commit 6e99402

File tree

3 files changed

+130
-11
lines changed

3 files changed

+130
-11
lines changed

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,10 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna
497497
throw new IndexShardClosedException(request.shardId());
498498
}
499499
cancellableThreads.checkForCancel();
500-
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
501-
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
500+
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
501+
// any ops before the starting sequence number
502+
final long seqNo = operation.seqNo();
503+
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
502504
operations.add(operation);
503505
ops++;
504506
size += operation.estimateSize();

core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020

2121
import org.apache.lucene.document.Document;
2222
import org.apache.lucene.document.Field;
23+
import org.apache.lucene.document.NumericDocValuesField;
2324
import org.apache.lucene.document.StringField;
2425
import org.apache.lucene.document.TextField;
2526
import org.apache.lucene.index.CorruptIndexException;
2627
import org.apache.lucene.index.DirectoryReader;
2728
import org.apache.lucene.index.IndexCommit;
2829
import org.apache.lucene.index.IndexReader;
2930
import org.apache.lucene.index.RandomIndexWriter;
31+
import org.apache.lucene.index.Term;
3032
import org.apache.lucene.store.BaseDirectoryWrapper;
3133
import org.apache.lucene.store.Directory;
3234
import org.apache.lucene.store.IOContext;
@@ -35,13 +37,23 @@
3537
import org.elasticsearch.Version;
3638
import org.elasticsearch.cluster.metadata.IndexMetaData;
3739
import org.elasticsearch.cluster.node.DiscoveryNode;
40+
import org.elasticsearch.common.bytes.BytesArray;
41+
import org.elasticsearch.common.bytes.BytesReference;
3842
import org.elasticsearch.common.io.FileSystemUtils;
3943
import org.elasticsearch.common.lease.Releasable;
4044
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
45+
import org.elasticsearch.common.lucene.uid.Versions;
4146
import org.elasticsearch.common.settings.ClusterSettings;
4247
import org.elasticsearch.common.settings.Settings;
4348
import org.elasticsearch.index.IndexSettings;
49+
import org.elasticsearch.index.engine.Engine;
4450
import org.elasticsearch.index.engine.SegmentsStats;
51+
import org.elasticsearch.index.mapper.Mapping;
52+
import org.elasticsearch.index.mapper.ParseContext;
53+
import org.elasticsearch.index.mapper.ParsedDocument;
54+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
55+
import org.elasticsearch.index.mapper.Uid;
56+
import org.elasticsearch.index.mapper.UidFieldMapper;
4557
import org.elasticsearch.index.seqno.SeqNoStats;
4658
import org.elasticsearch.index.seqno.SequenceNumbersService;
4759
import org.elasticsearch.index.shard.IndexShard;
@@ -60,6 +72,7 @@
6072
import java.io.IOException;
6173
import java.nio.file.Path;
6274
import java.util.ArrayList;
75+
import java.util.Arrays;
6376
import java.util.Collections;
6477
import java.util.List;
6578
import java.util.concurrent.atomic.AtomicBoolean;
@@ -136,6 +149,72 @@ public void close() throws IOException {
136149
IOUtils.close(reader, store, targetStore);
137150
}
138151

152+
public void testSendSnapshotSendsOps() throws IOException {
153+
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
154+
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
155+
final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
156+
final StartRecoveryRequest request = new StartRecoveryRequest(
157+
shardId,
158+
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
159+
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
160+
null,
161+
randomBoolean(),
162+
randomNonNegativeLong(),
163+
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
164+
final IndexShard shard = mock(IndexShard.class);
165+
when(shard.state()).thenReturn(IndexShardState.STARTED);
166+
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
167+
final RecoverySourceHandler handler =
168+
new RecoverySourceHandler(shard, recoveryTarget, request, () -> 0L, e -> () -> {}, fileChunkSizeInBytes, Settings.EMPTY);
169+
final List<Translog.Operation> operations = new ArrayList<>();
170+
final int initialNumberOfDocs = randomIntBetween(16, 64);
171+
for (int i = 0; i < initialNumberOfDocs; i++) {
172+
final Engine.Index index = getIndex(Integer.toString(i));
173+
operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true)));
174+
}
175+
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
176+
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
177+
final Engine.Index index = getIndex(Integer.toString(i));
178+
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
179+
}
180+
operations.add(null);
181+
int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
182+
private int counter = 0;
183+
184+
@Override
185+
public int totalOperations() {
186+
return operations.size() - 1;
187+
}
188+
189+
@Override
190+
public Translog.Operation next() throws IOException {
191+
return operations.get(counter++);
192+
}
193+
});
194+
if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
195+
assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
196+
} else {
197+
assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
198+
}
199+
}
200+
201+
private Engine.Index getIndex(final String id) {
202+
final String type = "test";
203+
final ParseContext.Document document = new ParseContext.Document();
204+
document.add(new TextField("test", "test", Field.Store.YES));
205+
final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
206+
final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
207+
final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
208+
document.add(uidField);
209+
document.add(versionField);
210+
document.add(seqID.seqNo);
211+
document.add(seqID.seqNoDocValue);
212+
document.add(seqID.primaryTerm);
213+
final BytesReference source = new BytesArray(new byte[] { 1 });
214+
final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null);
215+
return new Engine.Index(new Term("_uid", doc.uid()), doc);
216+
}
217+
139218
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
140219
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
141220
put("indices.recovery.concurrent_small_file_streams", 1).build();

qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@
2828
import org.elasticsearch.cluster.metadata.IndexMetaData;
2929
import org.elasticsearch.common.Strings;
3030
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.xcontent.XContentHelper;
3132
import org.elasticsearch.common.xcontent.XContentType;
33+
import org.elasticsearch.common.xcontent.json.JsonXContent;
3234
import org.elasticsearch.index.IndexSettings;
3335
import org.elasticsearch.index.seqno.SeqNoStats;
3436
import org.elasticsearch.index.seqno.SequenceNumbersService;
3537
import org.elasticsearch.test.rest.ESRestTestCase;
3638
import org.elasticsearch.test.rest.yaml.ObjectPath;
3739

3840
import java.io.IOException;
41+
import java.io.InputStream;
3942
import java.nio.charset.StandardCharsets;
4043
import java.util.ArrayList;
4144
import java.util.Collections;
@@ -95,7 +98,8 @@ public void testSeqNoCheckpoints() throws Exception {
9598
Nodes nodes = buildNodeAndVersions();
9699
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
97100
logger.info("cluster discovered: {}", nodes.toString());
98-
final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
101+
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
102+
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
99103
Settings.Builder settings = Settings.builder()
100104
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
101105
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
@@ -111,26 +115,60 @@ public void testSeqNoCheckpoints() throws Exception {
111115
createIndex(index, settings.build());
112116
try (RestClient newNodeClient = buildClient(restClientSettings(),
113117
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
114-
int numDocs = indexDocs(index, 0, randomInt(5));
118+
int numDocs = 0;
119+
final int numberOfInitialDocs = 1 + randomInt(5);
120+
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
121+
numDocs += indexDocs(index, 0, numberOfInitialDocs);
115122
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
116-
117123
logger.info("allowing shards on all nodes");
118124
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
119125
ensureGreen();
120-
logger.info("indexing some more docs");
121-
numDocs += indexDocs(index, numDocs, randomInt(5));
126+
assertOK(client().performRequest("POST", index + "/_refresh"));
127+
for (final String bwcName : bwcNamesList) {
128+
assertCount(index, "_only_nodes:" + bwcName, numDocs);
129+
}
130+
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
131+
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
132+
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
122133
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
123-
logger.info("moving primary to new node");
124134
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
135+
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
125136
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
126137
ensureGreen();
127-
logger.info("indexing some more docs");
128-
int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5));
129-
numDocs += numDocsOnNewPrimary;
138+
int numDocsOnNewPrimary = 0;
139+
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
140+
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
141+
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
142+
numDocs += numberOfDocsAfterMovingPrimary;
143+
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
144+
/*
145+
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
146+
* the recovery code.
147+
*/
148+
logger.info("setting number of replicas to 0");
149+
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
150+
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
151+
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
152+
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
153+
numDocs += numberOfDocsAfterDroppingReplicas;
154+
logger.info("setting number of replicas to 1");
155+
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
156+
ensureGreen();
157+
assertOK(client().performRequest("POST", index + "/_refresh"));
158+
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
159+
assertCount(index, "_primary", numDocs);
160+
assertCount(index, "_replica", numDocs);
130161
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
131162
}
132163
}
133164

165+
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
166+
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
167+
assertOK(response);
168+
final int actualCount = Integer.parseInt(objectPath(response).evaluate("count").toString());
169+
assertThat(actualCount, equalTo(expectedCount));
170+
}
171+
134172
private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
135173
assertBusy(() -> {
136174
try {

0 commit comments

Comments
 (0)