Skip to content

Commit 1d63334

Browse files
author
Ali Beyad
committed
improvements to iterating over commit data (and better safety guarantees)
1 parent a014f55 commit 1d63334

File tree

2 files changed

+76
-31
lines changed

2 files changed

+76
-31
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.io.IOException;
7575
import java.util.Arrays;
7676
import java.util.HashMap;
77+
import java.util.Iterator;
7778
import java.util.List;
7879
import java.util.Map;
7980
import java.util.Set;
@@ -282,7 +283,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr
282283
boolean success = false;
283284
try {
284285
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
285-
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
286+
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
286287
success = true;
287288
} finally {
288289
if (success == false) {
@@ -307,7 +308,7 @@ public Translog getTranslog() {
307308
private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
308309
// commit on a just opened writer will commit even if there are no changes done to it
309310
// we rely on that for the commit data translog id key
310-
final Map<String, String> commitUserData = writer.getCommitData();
311+
final Map<String, String> commitUserData = commitDataAsMap(writer);
311312
if (commitUserData.containsKey("translog_id")) {
312313
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
313314
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
@@ -322,6 +323,11 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
322323
return null;
323324
}
324325

326+
// package private for testing
327+
SeqNoStats loadSeqNoStatsFromCommit() throws IOException {
328+
return loadSeqNoStatsFromCommit(indexWriter);
329+
}
330+
325331
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
326332
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
327333
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
@@ -1308,40 +1314,52 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
13081314
ensureCanFlush();
13091315
try {
13101316
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
1311-
final Map<String, String> commitData = new HashMap<>(5);
13121317

1313-
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
1314-
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
1318+
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
1319+
final String translogUUID = translogGeneration.translogUUID;
1320+
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
1321+
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());
13151322

1316-
commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
1317-
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));
1323+
writer.setLiveCommitData(new Iterable<Map.Entry<String, String>>() {
1324+
// save the max seq no the first time its computed, so subsequent iterations don't recompute,
1325+
// potentially getting a different value
1326+
private String computedMaxSeqNoEntry = null;
13181327

1319-
if (syncId != null) {
1320-
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
1321-
}
1328+
@Override
1329+
public Iterator<Map.Entry<String, String>> iterator() {
1330+
/**
1331+
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
1332+
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
1333+
* The maximum sequence number is different - we never want the maximum sequence number to be
1334+
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
1335+
* of re-using a sequence number for two different documents when restoring from this commit
1336+
* point and subsequently writing new documents to the index. Since we only know which Lucene
1337+
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
1338+
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
1339+
* data iterator (which occurs after all documents have been flushed to Lucene).
1340+
*/
1341+
final Map<String, String> commitData = new HashMap<>(6);
1342+
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
1343+
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
1344+
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
1345+
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
1346+
if (syncId != null) {
1347+
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
1348+
}
1349+
if (computedMaxSeqNoEntry == null) {
1350+
// evaluated once at the time of the first invocation of this method
1351+
computedMaxSeqNoEntry = Long.toString(seqNoService().getMaxSeqNo());
1352+
}
1353+
commitData.put(MAX_SEQ_NO, computedMaxSeqNoEntry);
1354+
return commitData.entrySet().iterator();
1355+
}
1356+
});
1357+
1358+
writer.commit();
13221359

13231360
if (logger.isTraceEnabled()) {
1324-
logger.trace("committing writer with commit data (max_seq_no excluded) [{}]", commitData);
1361+
logger.trace("committed writer with commit data [{}]", commitDataAsMap(writer));
13251362
}
1326-
1327-
indexWriter.setLiveCommitData(() -> {
1328-
/**
1329-
* The user data in the commitData map contains data that must be evaluated *before*
1330-
* Lucene flushes segments, including the local and global checkpoints amongst other values.
1331-
* The maximum sequence number is different - we never want the maximum sequence number to be
1332-
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
1333-
* of re-using a sequence number for two different documents when restoring from this commit
1334-
* point and subsequently writing new documents to the index. Since we only know which Lucene
1335-
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
1336-
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
1337-
* data iterator (which occurs after all documents have been flushed to Lucene).
1338-
*/
1339-
final Map<String, String> deferredCommitData = new HashMap<>(commitData.size() + 1);
1340-
deferredCommitData.putAll(commitData);
1341-
deferredCommitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
1342-
return deferredCommitData.entrySet().iterator();
1343-
});
1344-
writer.commit();
13451363
} catch (Exception ex) {
13461364
try {
13471365
failEngine("lucene commit failed", ex);
@@ -1443,4 +1461,15 @@ boolean indexWriterHasDeletions() {
14431461
public boolean isRecovering() {
14441462
return pendingTranslogRecovery.get();
14451463
}
1464+
1465+
/**
1466+
* Gets the commit data from {@link IndexWriter} as a map.
1467+
*/
1468+
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
1469+
Map<String, String> commitData = new HashMap<>(6);
1470+
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
1471+
commitData.put(entry.getKey(), entry.getValue());
1472+
}
1473+
return commitData;
1474+
}
14461475
}

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
import static org.hamcrest.Matchers.equalTo;
139139
import static org.hamcrest.Matchers.everyItem;
140140
import static org.hamcrest.Matchers.greaterThan;
141+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
141142
import static org.hamcrest.Matchers.hasKey;
142143
import static org.hamcrest.Matchers.not;
143144
import static org.hamcrest.Matchers.notNullValue;
@@ -1646,18 +1647,33 @@ public void testSeqNoAndCheckpoints() throws IOException {
16461647
SequenceNumbersService.UNASSIGNED_SEQ_NO,
16471648
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
16481649
PRIMARY, 0, -1, false);
1650+
boolean versionConflict = false;
16491651
try {
16501652
initialEngine.index(index);
16511653
primarySeqNo++;
16521654
} catch (VersionConflictEngineException e) {
1653-
1655+
versionConflict = true;
16541656
}
16551657

16561658
replicaLocalCheckpoint =
16571659
rarely() ? replicaLocalCheckpoint : randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo));
16581660
initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint());
16591661
initialEngine.seqNoService().updateLocalCheckpointForShard("replica", replicaLocalCheckpoint);
16601662

1663+
// make sure the max seq no in the latest commit hasn't advanced due to more documents having been added;
1664+
// the first time the commit data iterable gets an iterator, the max seq no from that point in time should
1665+
// remain from any subsequent call to IndexWriter#getLiveCommitData unless the commit data is overridden by a
1666+
// subsequent call to IndexWriter#setLiveCommitData.
1667+
if (initialEngine.seqNoService().getMaxSeqNo() != SequenceNumbersService.NO_OPS_PERFORMED) {
1668+
assertThat(
1669+
initialEngine.seqNoService().getMaxSeqNo(),
1670+
// its possible that right after a commit, a version conflict exception happened so the max seq no was not updated,
1671+
// so here we check greater than or equal to
1672+
versionConflict ? greaterThanOrEqualTo(initialEngine.loadSeqNoStatsFromCommit().getMaxSeqNo()) :
1673+
greaterThan(initialEngine.loadSeqNoStatsFromCommit().getMaxSeqNo())
1674+
);
1675+
}
1676+
16611677
if (rarely()) {
16621678
localCheckpoint = primarySeqNo;
16631679
maxSeqNo = primarySeqNo;

0 commit comments

Comments
 (0)