Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,14 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
* @param primaryTerm the shards primary term this engine was created for
* @return the number of no-ops added
*/
public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log.
* This operation will close the engine if the recovery fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,28 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
logger.trace("created new InternalEngine");
}

@Override
public int fillSequenceNumberHistory(long primaryTerm) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqId = seqNoService.getMaxSeqNo();
int numNoOpsAdded = 0;
for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId;
// the local checkpoint might have been advanced so we are leap-frogging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the local checkpoint must have advanced by at least one. We can assert on that after the noop was indexed.

// to the next seq ID we need to process and create a noop for
seqNo = seqNoService.getLocalCheckpoint()+1) {
final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history");
innerNoOp(noOp);
numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:"
+ seqNoService.getLocalCheckpoint();

}
return numNoOpsAdded;
}
}

private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
Expand Down Expand Up @@ -1074,6 +1096,7 @@ public NoOpResult noOp(final NoOp noOp) {
}

private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
indexShard.performTranslogRecovery(indexShouldExists);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm());
}
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -191,7 +192,7 @@

public class InternalEngineTests extends ESTestCase {

protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);

protected ThreadPool threadPool;
Expand Down Expand Up @@ -1961,7 +1962,7 @@ private static class MockAppender extends AbstractAppender {
@Override
public void append(LogEvent event) {
final String formattedMessage = event.getMessage().getFormattedMessage();
if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][1] ")) {
if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0] ")) {
if (event.getLoggerName().endsWith(".IW") &&
formattedMessage.contains("IW: apply all deletes during flush")) {
sawIndexWriterMessage = true;
Expand Down Expand Up @@ -2341,7 +2342,7 @@ private Engine.Index indexForDoc(ParsedDocument doc) {

private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo,
boolean isRetry) {
return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL,
return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL,
Engine.Operation.Origin.REPLICA, System.nanoTime(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
}
Expand Down Expand Up @@ -3853,4 +3854,79 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
}
}

public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
final int docs = randomIntBetween(1, 32);
int numDocsOnReplica = 0;
long maxSeqIDOnReplica = -1;
long checkpointOnReplica;
try {
for (int i = 0; i < docs; i++) {
final String docId = Integer.toString(i);
final ParsedDocument doc =
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(primaryResponse);
if (randomBoolean()) {
numDocsOnReplica++;
maxSeqIDOnReplica = indexResult.getSeqNo();
replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false));
}
}
checkpointOnReplica = replicaEngine.seqNoService().getLocalCheckpoint();
} finally {
IOUtils.close(replicaEngine);
}


boolean flushed = false;
Engine recoveringEngine = null;
try {
assertEquals(docs-1, engine.seqNoService().getMaxSeqNo());
assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2));

// now snapshot the tlog and ensure the primary term is updated
Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot();
assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation;
while((operation = snapshot.next()) != null) {
if (operation.opType() == Translog.Operation.Type.NO_OP) {
assertEquals(2, operation.primaryTerm());
} else {
assertEquals(1, operation.primaryTerm());
}

}
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
if ((flushed = randomBoolean())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we snapshot the translog and assert that the noops have the right primary term?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I had that but remvoed it... good catch...

recoveringEngine.flush(true, true);
}
} finally {
IOUtils.close(recoveringEngine);
}

// now do it again to make sure we preserve values etc.
try {
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
if (flushed) {
assertEquals(0, recoveringEngine.getTranslog().totalOperations());
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3));
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
} finally {
IOUtils.close(recoveringEngine);
}
}
}