Skip to content

Commit 27532b1

Browse files
author
Ali Beyad
committed
Adds a test for concurrently indexing and committing segments
to Lucene, ensuring the sequence number related commit data in each Lucene commit point matches the invariants of localCheckpoint <= highest sequence number in commit <= maxSeqNo
1 parent 2656776 commit 27532b1

File tree

2 files changed

+132
-2
lines changed

2 files changed

+132
-2
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,7 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
13411341
}
13421342
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
13431343
if (logger.isTraceEnabled()) {
1344-
logger.trace("committed writer with commit data [{}]", commitDataAsMap(writer));
1344+
logger.trace("committed writer with commit data [{}]", commitData);
13451345
}
13461346
return commitData.entrySet().iterator();
13471347
});

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,19 @@
3232
import org.apache.lucene.document.NumericDocValuesField;
3333
import org.apache.lucene.document.TextField;
3434
import org.apache.lucene.index.DirectoryReader;
35+
import org.apache.lucene.index.IndexCommit;
36+
import org.apache.lucene.index.IndexReader;
3537
import org.apache.lucene.index.IndexWriterConfig;
3638
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
39+
import org.apache.lucene.index.LeafReader;
40+
import org.apache.lucene.index.LeafReaderContext;
3741
import org.apache.lucene.index.LiveIndexWriterConfig;
3842
import org.apache.lucene.index.LogByteSizeMergePolicy;
3943
import org.apache.lucene.index.LogDocMergePolicy;
4044
import org.apache.lucene.index.MergePolicy;
45+
import org.apache.lucene.index.NoDeletionPolicy;
4146
import org.apache.lucene.index.NoMergePolicy;
47+
import org.apache.lucene.index.NumericDocValues;
4248
import org.apache.lucene.index.SnapshotDeletionPolicy;
4349
import org.apache.lucene.index.Term;
4450
import org.apache.lucene.index.TieredMergePolicy;
@@ -51,10 +57,13 @@
5157
import org.apache.lucene.store.AlreadyClosedException;
5258
import org.apache.lucene.store.Directory;
5359
import org.apache.lucene.store.MockDirectoryWrapper;
60+
import org.apache.lucene.util.Bits;
61+
import org.apache.lucene.util.FixedBitSet;
5462
import org.apache.lucene.util.IOUtils;
5563
import org.apache.lucene.util.TestUtil;
5664
import org.elasticsearch.ElasticsearchException;
5765
import org.elasticsearch.Version;
66+
import org.elasticsearch.action.fieldstats.FieldStats;
5867
import org.elasticsearch.action.index.IndexRequest;
5968
import org.elasticsearch.action.support.TransportActions;
6069
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -87,6 +96,7 @@
8796
import org.elasticsearch.index.mapper.RootObjectMapper;
8897
import org.elasticsearch.index.mapper.SourceFieldMapper;
8998
import org.elasticsearch.index.mapper.UidFieldMapper;
99+
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
90100
import org.elasticsearch.index.seqno.SequenceNumbersService;
91101
import org.elasticsearch.index.shard.DocsStats;
92102
import org.elasticsearch.index.shard.IndexSearcherWrapper;
@@ -141,6 +151,7 @@
141151
import static org.hamcrest.Matchers.greaterThan;
142152
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
143153
import static org.hamcrest.Matchers.hasKey;
154+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
144155
import static org.hamcrest.Matchers.not;
145156
import static org.hamcrest.Matchers.notNullValue;
146157
import static org.hamcrest.Matchers.nullValue;
@@ -297,6 +308,13 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store,
297308

298309
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
299310
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
311+
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
312+
maxUnsafeAutoIdTimestamp, refreshListener);
313+
}
314+
315+
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
316+
SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp,
317+
ReferenceManager.RefreshListener refreshListener) {
300318
IndexWriterConfig iwc = newIndexWriterConfig();
301319
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
302320
final EngineConfig.OpenMode openMode;
@@ -315,7 +333,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
315333
// we don't need to notify anybody in this test
316334
}
317335
};
318-
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
336+
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy,
319337
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
320338
new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
321339
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener,
@@ -1748,6 +1766,118 @@ public void testSeqNoAndCheckpoints() throws IOException {
17481766
}
17491767
}
17501768

1769+
// this test writes documents to the engine while concurrently flushing/commit
1770+
// and ensuring that the commit points contain the correct sequence number data
1771+
public void testConcurrentWritesAndCommits() throws Exception {
1772+
try (final Store store = createStore();
1773+
final InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
1774+
new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE),
1775+
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
1776+
1777+
final int numIndexingThreads = randomIntBetween(4, 7);
1778+
final int numDocsPerThread = randomIntBetween(500, 1000);
1779+
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
1780+
final List<Thread> indexingThreads = new ArrayList<>();
1781+
final List<AtomicBoolean> threadStatuses = new ArrayList<>();
1782+
for (int i = 0; i < numIndexingThreads; i++) {
1783+
threadStatuses.add(new AtomicBoolean());
1784+
}
1785+
// create N indexing threads to index documents simultaneously
1786+
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
1787+
final int threadIdx = threadNum;
1788+
Thread indexingThread = new Thread() {
1789+
@Override
1790+
public void run() {
1791+
try {
1792+
barrier.await(); // wait for both threads to start at the same time
1793+
// index a random number of docs
1794+
for (int i = 0; i < numDocsPerThread; i++) {
1795+
final String id = "thread" + threadIdx + "#" + i;
1796+
ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);
1797+
engine.index(new Engine.Index(newUid(id), doc));
1798+
}
1799+
} catch (Exception e) {
1800+
throw new RuntimeException(e);
1801+
} finally {
1802+
threadStatuses.get(threadIdx).set(true); // signal that this thread is done indexing
1803+
}
1804+
}
1805+
};
1806+
indexingThreads.add(indexingThread);
1807+
}
1808+
1809+
// start the indexing threads
1810+
for (Thread thread : indexingThreads) {
1811+
thread.start();
1812+
}
1813+
barrier.await(); // wait for indexing threads to all be ready to start
1814+
1815+
// create random commit points
1816+
boolean doneIndexing;
1817+
do {
1818+
doneIndexing = threadStatuses.stream().filter(status -> status.get() == false).count() == 0;
1819+
engine.flush(); // flush and commit
1820+
} while (doneIndexing == false);
1821+
1822+
// now, verify all the commits have the correct docs according to the user commit data
1823+
long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
1824+
for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) {
1825+
Map<String, String> userData = commit.getUserData();
1826+
long localCheckpoint = userData.containsKey(InternalEngine.LOCAL_CHECKPOINT_KEY) ?
1827+
Long.parseLong(userData.get(InternalEngine.LOCAL_CHECKPOINT_KEY)) :
1828+
SequenceNumbersService.NO_OPS_PERFORMED;
1829+
long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ?
1830+
Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) :
1831+
SequenceNumbersService.UNASSIGNED_SEQ_NO;
1832+
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)) ; // local checkpoint shouldn't go backwards
1833+
try (IndexReader reader = DirectoryReader.open(commit)) {
1834+
FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
1835+
final long highestSeqNo;
1836+
if (stats != null) {
1837+
highestSeqNo = (long) stats.getMaxValue();
1838+
} else {
1839+
highestSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
1840+
}
1841+
// make sure localCheckpoint <= highest seq no found <= maxSeqNo
1842+
assertThat(highestSeqNo, greaterThanOrEqualTo(localCheckpoint));
1843+
assertThat(highestSeqNo, lessThanOrEqualTo(maxSeqNo));
1844+
// make sure all sequence numbers up to and including the local checkpoint are in the index
1845+
FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo);
1846+
for (int i = 0; i <= localCheckpoint; i++) {
1847+
assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed",
1848+
seqNosBitSet.get(i));
1849+
}
1850+
}
1851+
prevLocalCheckpoint = localCheckpoint;
1852+
}
1853+
}
1854+
}
1855+
1856+
private static FixedBitSet getSeqNosSet(final IndexReader reader, final long highestSeqNo) throws IOException {
1857+
// _seq_no are stored as doc values for the time being, so this is how we get them
1858+
// (as opposed to using an IndexSearcher or IndexReader)
1859+
final FixedBitSet bitSet = new FixedBitSet((int) highestSeqNo + 1);
1860+
final List<LeafReaderContext> leaves = reader.leaves();
1861+
if (leaves.isEmpty()) {
1862+
return bitSet;
1863+
}
1864+
1865+
for (int i = 0; i < leaves.size(); i++) {
1866+
final LeafReader leaf = leaves.get(i).reader();
1867+
final NumericDocValues values = leaf.getNumericDocValues(SeqNoFieldMapper.NAME);
1868+
if (values == null) {
1869+
continue;
1870+
}
1871+
final Bits bits = leaf.getLiveDocs();
1872+
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
1873+
if (bits == null || bits.get(docID)) {
1874+
bitSet.set((int) values.get(docID));
1875+
}
1876+
}
1877+
}
1878+
return bitSet;
1879+
}
1880+
17511881
// #8603: make sure we can separately log IFD's messages
17521882
public void testIndexWriterIFDInfoStream() throws IllegalAccessException {
17531883
assumeFalse("who tests the tester?", VERBOSE);

0 commit comments

Comments
 (0)