Skip to content

Commit a014f55

Browse files
author
Ali Beyad
committed
Sequence Numbers Lucene commit data uses Iterable interface
Sequence number related data (maximum sequence number, local checkpoint, and global checkpoint) gets stored in Lucene on each commit. The logical place to store this data is on each Lucene commit's user commit data structure (see IndexWriter#setCommitData and the new version IndexWriter#setLiveCommitData). However, previously we did not store the maximum sequence number in the commit data because the commit data got copied over before the Lucene IndexWriter flushed the documents to segments in the commit. This means that between the time that the commit data was set on the IndexWriter and the time that the IndexWriter completes the commit, documents with higher sequence numbers could have entered the commit. Hence, we would use FieldStats on the _seq_no field in the documents to get the maximum sequence number value, but this suffers the drawback that if the last sequence number in the commit corresponded to a delete document action, that sequence number would not show up in FieldStats as there would be no corresponding document in Lucene. In Lucene 6.2, the commit data was changed to take an Iterable interface, so that the commit data can be calculated and retrieved *after* all documents have been flushed, while the commit data itself is being set on the Lucene commit. This commit changes max_seq_no so it is stored in the commit data instead of being calculated from FieldStats, taking advantage of the deferred calculation of the max_seq_no through passing an Iterable that dynamically sets the iterator data.
1 parent 27eab74 commit a014f55

File tree

3 files changed

+56
-30
lines changed

3 files changed

+56
-30
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.lucene.util.InfoStream;
4444
import org.elasticsearch.ExceptionsHelper;
4545
import org.elasticsearch.Version;
46-
import org.elasticsearch.action.fieldstats.FieldStats;
4746
import org.elasticsearch.action.index.IndexRequest;
4847
import org.elasticsearch.common.Nullable;
4948
import org.elasticsearch.common.lease.Releasable;
@@ -59,7 +58,6 @@
5958
import org.elasticsearch.index.IndexSettings;
6059
import org.elasticsearch.index.VersionType;
6160
import org.elasticsearch.index.mapper.Uid;
62-
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
6361
import org.elasticsearch.index.merge.MergeStats;
6462
import org.elasticsearch.index.merge.OnGoingMerge;
6563
import org.elasticsearch.index.seqno.SeqNoStats;
@@ -86,8 +84,6 @@
8684
import java.util.concurrent.locks.ReentrantLock;
8785
import java.util.function.Function;
8886

89-
import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;
90-
9187
public class InternalEngine extends Engine {
9288

9389
/**
@@ -121,6 +117,7 @@ public class InternalEngine extends Engine {
121117
private final SequenceNumbersService seqNoService;
122118
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
123119
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
120+
static final String MAX_SEQ_NO = "max_seq_no";
124121

125122
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
126123
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -326,32 +323,20 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
326323
}
327324

328325
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
329-
final long maxSeqNo;
330-
try (IndexReader reader = DirectoryReader.open(writer)) {
331-
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
332-
if (stats != null) {
333-
maxSeqNo = (long) stats.getMaxValue();
334-
} else {
335-
maxSeqNo = NO_OPS_PERFORMED;
326+
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
327+
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
328+
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
329+
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
330+
final String key = entry.getKey();
331+
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
332+
localCheckpoint = Long.parseLong(entry.getValue());
333+
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
334+
globalCheckpoint = Long.parseLong(entry.getValue());
335+
} else if (key.equals(MAX_SEQ_NO)) {
336+
maxSeqNo = Long.parseLong(entry.getValue());
336337
}
337338
}
338339

339-
final Map<String, String> commitUserData = writer.getCommitData();
340-
341-
final long localCheckpoint;
342-
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
343-
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
344-
} else {
345-
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
346-
}
347-
348-
final long globalCheckpoint;
349-
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
350-
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
351-
} else {
352-
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
353-
}
354-
355340
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
356341
}
357342

@@ -1336,10 +1321,26 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
13361321
}
13371322

13381323
if (logger.isTraceEnabled()) {
1339-
logger.trace("committing writer with commit data [{}]", commitData);
1324+
logger.trace("committing writer with commit data (max_seq_no excluded) [{}]", commitData);
13401325
}
13411326

1342-
indexWriter.setCommitData(commitData);
1327+
indexWriter.setLiveCommitData(() -> {
1328+
/**
1329+
* The user data in the commitData map contains data that must be evaluated *before*
1330+
* Lucene flushes segments, including the local and global checkpoints amongst other values.
1331+
* The maximum sequence number is different - we never want the maximum sequence number to be
1332+
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
1333+
* of re-using a sequence number for two different documents when restoring from this commit
1334+
* point and subsequently writing new documents to the index. Since we only know which Lucene
1335+
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
1336+
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
1337+
* data iterator (which occurs after all documents have been flushed to Lucene).
1338+
*/
1339+
final Map<String, String> deferredCommitData = new HashMap<>(commitData.size() + 1);
1340+
deferredCommitData.putAll(commitData);
1341+
deferredCommitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
1342+
return deferredCommitData.entrySet().iterator();
1343+
});
13431344
writer.commit();
13441345
} catch (Exception ex) {
13451346
try {
@@ -1395,7 +1396,8 @@ public MergeStats getMergeStats() {
13951396
public SequenceNumbersService seqNoService() {
13961397
return seqNoService;
13971398
}
1398-
@Override
1399+
1400+
@Override
13991401
public DocsStats getDocStats() {
14001402
final int numDocs = indexWriter.numDocs();
14011403
final int maxDoc = indexWriter.maxDoc();

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ public long generateSeqNo() {
8181
return localCheckpointService.generateSeqNo();
8282
}
8383

84+
/**
85+
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
86+
*/
87+
public long getMaxSeqNo() {
88+
return localCheckpointService.getMaxSeqNo();
89+
}
90+
8491
/**
8592
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
8693
* more details

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,10 @@ public SequenceNumbersService seqNoService() {
583583
assertThat(
584584
Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
585585
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
586+
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
587+
assertThat(
588+
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
589+
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
586590

587591
maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
588592
localCheckpoint.set(
@@ -608,6 +612,8 @@ public SequenceNumbersService seqNoService() {
608612
assertThat(
609613
Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
610614
equalTo(globalCheckpoint.get()));
615+
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
616+
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
611617
} finally {
612618
IOUtils.close(engine);
613619
}
@@ -1625,6 +1631,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
16251631
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
16261632
long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
16271633
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
1634+
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
16281635
InternalEngine initialEngine = null;
16291636

16301637
try {
@@ -1653,6 +1660,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
16531660

16541661
if (rarely()) {
16551662
localCheckpoint = primarySeqNo;
1663+
maxSeqNo = primarySeqNo;
16561664
globalCheckpoint = replicaLocalCheckpoint;
16571665
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
16581666
initialEngine.flush(true, true);
@@ -1661,6 +1669,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
16611669

16621670
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
16631671

1672+
assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo());
16641673
assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
16651674
assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
16661675
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
@@ -1671,6 +1680,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
16711680
assertThat(
16721681
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
16731682
equalTo(globalCheckpoint));
1683+
assertThat(
1684+
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
1685+
equalTo(maxSeqNo));
16741686

16751687
} finally {
16761688
IOUtils.close(initialEngine);
@@ -1681,12 +1693,17 @@ public void testSeqNoAndCheckpoints() throws IOException {
16811693
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
16821694
recoveringEngine.recoverFromTranslog();
16831695

1696+
assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
16841697
assertThat(
16851698
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
16861699
equalTo(primarySeqNo));
16871700
assertThat(
16881701
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
16891702
equalTo(globalCheckpoint));
1703+
assertThat(
1704+
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
1705+
// after recovering from translog, all docs have been flushed to Lucene segments, so check against primarySeqNo
1706+
equalTo(primarySeqNo));
16901707
assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
16911708
assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
16921709
assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1));

0 commit comments

Comments
 (0)