Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
Expand All @@ -59,7 +58,6 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand All @@ -86,8 +84,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;

public class InternalEngine extends Engine {

/**
Expand Down Expand Up @@ -121,6 +117,7 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -326,32 +323,20 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
maxSeqNo = Long.parseLong(entry.getValue());
}
}

final Map<String, String> commitUserData = writer.getCommitData();

final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}

final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

Expand Down Expand Up @@ -1336,10 +1321,26 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
logger.trace("committing writer with commit data (max_seq_no excluded) [{}]", commitData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log this after the commit so we have everything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, in doing this, I noticed there is an issue in how the commit data is set - anytime the iterator() is called, it will recompute the maxSeqNo based on the current value of what the SequenceNumbersService returns. We only want this done once so each subsequent time we call writer.getLiveCommitData(), we get the same value that went into the commit. I'm fixing this and will push up a new commit

}

indexWriter.setCommitData(commitData);
indexWriter.setLiveCommitData(() -> {
/**
* The user data in the commitData map contains data that must be evaluated *before*
* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> deferredCommitData = new HashMap<>(commitData.size() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be simpler to just capture the local and global checkpoints before we start and build one map here. This way we don't need two maps.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

deferredCommitData.putAll(commitData);
deferredCommitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
return deferredCommitData.entrySet().iterator();
});
writer.commit();
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -1395,7 +1396,8 @@ public MergeStats getMergeStats() {
public SequenceNumbersService seqNoService() {
return seqNoService;
}
@Override

@Override
public DocsStats getDocStats() {
final int numDocs = indexWriter.numDocs();
final int maxDoc = indexWriter.maxDoc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public long generateSeqNo() {
return localCheckpointService.generateSeqNo();
}

/**
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
}

/**
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
* more details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ public SequenceNumbersService seqNoService() {
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));

maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
localCheckpoint.set(
Expand All @@ -608,6 +612,8 @@ public SequenceNumbersService seqNoService() {
assertThat(
Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally {
IOUtils.close(engine);
}
Expand Down Expand Up @@ -1625,6 +1631,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
InternalEngine initialEngine = null;

try {
Expand Down Expand Up @@ -1653,6 +1660,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

if (rarely()) {
localCheckpoint = primarySeqNo;
maxSeqNo = primarySeqNo;
globalCheckpoint = replicaLocalCheckpoint;
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
initialEngine.flush(true, true);
Expand All @@ -1661,6 +1669,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();

assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo());
assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
Expand All @@ -1671,6 +1680,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(maxSeqNo));

} finally {
IOUtils.close(initialEngine);
Expand All @@ -1681,12 +1693,17 @@ public void testSeqNoAndCheckpoints() throws IOException {
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine.recoverFromTranslog();

assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so check against primarySeqNo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow this comment? can you elaborate?

equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1));
Expand Down