|
20 | 20 |
|
21 | 21 | import org.apache.lucene.document.Document; |
22 | 22 | import org.apache.lucene.document.Field; |
| 23 | +import org.apache.lucene.document.NumericDocValuesField; |
23 | 24 | import org.apache.lucene.document.StringField; |
24 | 25 | import org.apache.lucene.document.TextField; |
25 | 26 | import org.apache.lucene.index.CorruptIndexException; |
26 | 27 | import org.apache.lucene.index.DirectoryReader; |
27 | 28 | import org.apache.lucene.index.IndexCommit; |
28 | 29 | import org.apache.lucene.index.IndexReader; |
29 | 30 | import org.apache.lucene.index.RandomIndexWriter; |
| 31 | +import org.apache.lucene.index.Term; |
30 | 32 | import org.apache.lucene.store.BaseDirectoryWrapper; |
31 | 33 | import org.apache.lucene.store.Directory; |
32 | 34 | import org.apache.lucene.store.IOContext; |
|
35 | 37 | import org.elasticsearch.Version; |
36 | 38 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
37 | 39 | import org.elasticsearch.cluster.node.DiscoveryNode; |
| 40 | +import org.elasticsearch.common.bytes.BytesArray; |
| 41 | +import org.elasticsearch.common.bytes.BytesReference; |
38 | 42 | import org.elasticsearch.common.io.FileSystemUtils; |
39 | 43 | import org.elasticsearch.common.lease.Releasable; |
40 | 44 | import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; |
| 45 | +import org.elasticsearch.common.lucene.uid.Versions; |
41 | 46 | import org.elasticsearch.common.settings.ClusterSettings; |
42 | 47 | import org.elasticsearch.common.settings.Settings; |
43 | 48 | import org.elasticsearch.index.IndexSettings; |
| 49 | +import org.elasticsearch.index.engine.Engine; |
44 | 50 | import org.elasticsearch.index.engine.SegmentsStats; |
| 51 | +import org.elasticsearch.index.mapper.Mapping; |
| 52 | +import org.elasticsearch.index.mapper.ParseContext; |
| 53 | +import org.elasticsearch.index.mapper.ParsedDocument; |
| 54 | +import org.elasticsearch.index.mapper.SeqNoFieldMapper; |
| 55 | +import org.elasticsearch.index.mapper.Uid; |
| 56 | +import org.elasticsearch.index.mapper.UidFieldMapper; |
45 | 57 | import org.elasticsearch.index.seqno.SeqNoStats; |
46 | 58 | import org.elasticsearch.index.seqno.SequenceNumbersService; |
47 | 59 | import org.elasticsearch.index.shard.IndexShard; |
|
60 | 72 | import java.io.IOException; |
61 | 73 | import java.nio.file.Path; |
62 | 74 | import java.util.ArrayList; |
| 75 | +import java.util.Arrays; |
63 | 76 | import java.util.Collections; |
64 | 77 | import java.util.List; |
65 | 78 | import java.util.concurrent.atomic.AtomicBoolean; |
@@ -136,6 +149,72 @@ public void close() throws IOException { |
136 | 149 | IOUtils.close(reader, store, targetStore); |
137 | 150 | } |
138 | 151 |
|
| 152 | + public void testSendSnapshotSendsOps() throws IOException { |
| 153 | + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); |
| 154 | + final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); |
| 155 | + final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); |
| 156 | + final StartRecoveryRequest request = new StartRecoveryRequest( |
| 157 | + shardId, |
| 158 | + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), |
| 159 | + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), |
| 160 | + null, |
| 161 | + randomBoolean(), |
| 162 | + randomNonNegativeLong(), |
| 163 | + randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); |
| 164 | + final IndexShard shard = mock(IndexShard.class); |
| 165 | + when(shard.state()).thenReturn(IndexShardState.STARTED); |
| 166 | + final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class); |
| 167 | + final RecoverySourceHandler handler = |
| 168 | + new RecoverySourceHandler(shard, recoveryTarget, request, () -> 0L, e -> () -> {}, fileChunkSizeInBytes, Settings.EMPTY); |
| 169 | + final List<Translog.Operation> operations = new ArrayList<>(); |
| 170 | + final int initialNumberOfDocs = randomIntBetween(16, 64); |
| 171 | + for (int i = 0; i < initialNumberOfDocs; i++) { |
| 172 | + final Engine.Index index = getIndex(Integer.toString(i)); |
| 173 | + operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true))); |
| 174 | + } |
| 175 | + final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64); |
| 176 | + for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) { |
| 177 | + final Engine.Index index = getIndex(Integer.toString(i)); |
| 178 | + operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); |
| 179 | + } |
| 180 | + operations.add(null); |
| 181 | + int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { |
| 182 | + private int counter = 0; |
| 183 | + |
| 184 | + @Override |
| 185 | + public int totalOperations() { |
| 186 | + return operations.size() - 1; |
| 187 | + } |
| 188 | + |
| 189 | + @Override |
| 190 | + public Translog.Operation next() throws IOException { |
| 191 | + return operations.get(counter++); |
| 192 | + } |
| 193 | + }); |
| 194 | + if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { |
| 195 | + assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); |
| 196 | + } else { |
| 197 | + assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); |
| 198 | + } |
| 199 | + } |
| 200 | + |
| 201 | + private Engine.Index getIndex(final String id) { |
| 202 | + final String type = "test"; |
| 203 | + final ParseContext.Document document = new ParseContext.Document(); |
| 204 | + document.add(new TextField("test", "test", Field.Store.YES)); |
| 205 | + final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); |
| 206 | + final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); |
| 207 | + final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); |
| 208 | + document.add(uidField); |
| 209 | + document.add(versionField); |
| 210 | + document.add(seqID.seqNo); |
| 211 | + document.add(seqID.seqNoDocValue); |
| 212 | + document.add(seqID.primaryTerm); |
| 213 | + final BytesReference source = new BytesArray(new byte[] { 1 }); |
| 214 | + final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null); |
| 215 | + return new Engine.Index(new Term("_uid", doc.uid()), doc); |
| 216 | + } |
| 217 | + |
139 | 218 | public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { |
140 | 219 | Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). |
141 | 220 | put("indices.recovery.concurrent_small_file_streams", 1).build(); |
|
0 commit comments