diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 30f38230bc94f..21dd799122e39 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -415,7 +415,7 @@ public enum ReplicaItemExecutionMode { static { assert Version.CURRENT.minimumCompatibilityVersion().after(Version.V_5_0_0) == false: "Remove logic handling NoOp result from primary response; see TODO in replicaItemExecutionMode" + - " as the current minimum compatible version [" + + " as the current minimum compatible version [" + Version.CURRENT.minimumCompatibilityVersion() + "] is after 5.0"; } @@ -565,7 +565,7 @@ static Engine.Index prepareIndexOperationOnReplica( final long version = primaryResponse.getVersion(); final long seqNo = primaryResponse.getSeqNo(); final SourceToParse sourceToParse = - SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), + SourceToParse.source(shardId.getIndexName(), request.type(), request.id(), request.source(), request.getContentType()) .routing(request.routing()).parent(request.parent()); final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery(); @@ -578,7 +578,7 @@ static Engine.Index prepareIndexOperationOnReplica( /** Utility method to prepare an index operation on primary shards */ private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { final SourceToParse sourceToParse = - SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), + SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType()) .routing(request.routing()).parent(request.parent()); return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 122587949e319..755d9db68b027 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1415,6 +1415,14 @@ public interface Warmer { */ public abstract void deactivateThrottling(); + /** + * Fills up the local checkpoints history with no-ops until the local checkpoint + * and the max seen sequence ID are identical. + * @param primaryTerm the shards primary term this engine was created for + * @return the number of no-ops added + */ + public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException; + /** * Performs recovery from the transaction log. * This operation will close the engine if the recovery fails. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e5d3453cacf9..583f916001d7a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -228,6 +228,28 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("created new InternalEngine"); } + @Override + public int fillSequenceNumberHistory(long primaryTerm) throws IOException { + try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final long maxSeqId = seqNoService.getMaxSeqNo(); + int numNoOpsAdded = 0; + for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; + // the local checkpoint might have been advanced so we are leap-frogging + // to the next seq ID we need to process and create a noop for + seqNo = seqNoService.getLocalCheckpoint()+1) { + final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); + innerNoOp(noOp); + numNoOpsAdded++; + assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:" + + seqNoService.getLocalCheckpoint(); + + } + return numNoOpsAdded; + } + } + private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; for (Map.Entry entry : writer.getLiveCommitData()) { @@ -1074,6 +1096,7 @@ public NoOpResult noOp(final NoOp noOp) { } private NoOpResult innerNoOp(final NoOp noOp) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); try { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index a8a983ecde832..52e3001da84cd 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -23,22 +23,15 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; public class SourceToParse { - public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return source(Origin.PRIMARY, index, type, id, source, contentType); - } - - public static SourceToParse source(Origin origin, String index, String type, String id, BytesReference source, + public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return new SourceToParse(origin, index, type, id, source, contentType); + return new SourceToParse(index, type, id, source, contentType); } - private final Origin origin; - private final BytesReference source; private final String index; @@ -53,8 +46,7 @@ public static SourceToParse source(Origin origin, String index, String type, Str private XContentType xContentType; - private SourceToParse(Origin origin, String index, String type, String id, BytesReference source, XContentType xContentType) { - this.origin = Objects.requireNonNull(origin); + private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType) { this.index = Objects.requireNonNull(index); this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); @@ -64,10 +56,6 @@ private SourceToParse(Origin origin, String index, String type, String id, Bytes this.xContentType = Objects.requireNonNull(xContentType); } - public Origin origin() { - return origin; - } - public BytesReference source() { return this.source; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 6cfaca8c45b4b..8ea84e572e4bf 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -365,6 +365,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe logger.debug("failed to list file details", e); } indexShard.performTranslogRecovery(indexShouldExists); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fe9e75f304ac7..9013bc1898d15 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -114,6 +114,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -191,7 +192,7 @@ public class InternalEngineTests extends ESTestCase { - protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1); + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; @@ -1961,7 +1962,7 @@ private static class MockAppender extends AbstractAppender { @Override public void append(LogEvent event) { final String formattedMessage = event.getMessage().getFormattedMessage(); - if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][1] ")) { + if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0] ")) { if (event.getLoggerName().endsWith(".IW") && formattedMessage.contains("IW: apply all deletes during flush")) { sawIndexWriterMessage = true; @@ -2341,7 +2342,7 @@ private Engine.Index indexForDoc(ParsedDocument doc) { private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } @@ -3853,4 +3854,79 @@ private Tuple getSequenceID(Engine engine, Engine.Get get) throws En } } + public void testFillUpSequenceIdGapsOnRecovery() throws IOException { + final int docs = randomIntBetween(1, 32); + int numDocsOnReplica = 0; + long maxSeqIDOnReplica = -1; + long checkpointOnReplica; + try { + for (int i = 0; i < docs; i++) { + final String docId = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + Engine.IndexResult indexResult = engine.index(primaryResponse); + if (randomBoolean()) { + numDocsOnReplica++; + maxSeqIDOnReplica = indexResult.getSeqNo(); + replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); + } + } + checkpointOnReplica = replicaEngine.seqNoService().getLocalCheckpoint(); + } finally { + IOUtils.close(replicaEngine); + } + + + boolean flushed = false; + Engine recoveringEngine = null; + try { + assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); + assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); + assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations()); + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + + // now snapshot the tlog and ensure the primary term is updated + Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); + assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); + Translog.Operation operation; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + assertEquals(2, operation.primaryTerm()); + } else { + assertEquals(1, operation.primaryTerm()); + } + + } + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + if ((flushed = randomBoolean())) { + recoveringEngine.flush(true, true); + } + } finally { + IOUtils.close(recoveringEngine); + } + + // now do it again to make sure we preserve values etc. + try { + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + if (flushed) { + assertEquals(0, recoveringEngine.getTranslog().totalOperations()); + } + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + } finally { + IOUtils.close(recoveringEngine); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 629a8af3e0d3c..dfeb081152592 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -896,6 +898,46 @@ public void testRecoverFromStore() throws IOException { closeShards(newShard); } + /* This test just verifies that we fill up local checkpoint up to max seen seqID on primary recovery */ + public void testRecoverFromStoreWithNoOps() throws IOException { + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); + Engine.Index test = indexDoc(shard, "test", "1"); + // start a replica shard and index the second doc + final IndexShard otherShard = newStartedShard(false); + test = otherShard.prepareIndexOnReplica( + SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), + XContentType.JSON), + 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + otherShard.index(test); + + final ShardRouting primaryShardRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot(); + Translog.Operation operation; + int numNoops = 0; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + numNoops++; + assertEquals(1, operation.primaryTerm()); + assertEquals(0, operation.seqNo()); + } + } + assertEquals(1, numNoops); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 1); + assertDocCount(shard, 2); + closeShards(newShard, shard); + } + public void testRecoverFromCleanStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0"); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bbe4d8ed12e80..261e53064fe01 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -44,7 +44,7 @@ public void testGetStartingSeqNo() throws Exception { long seqNo = 0; for (int i = 0; i < docs; i++) { Engine.Index indexOp = replica.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.REPLICA, index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), + SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); replica.index(indexOp); if (rarely()) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4062666ddbbf8..65576dcf0a2b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -484,7 +484,7 @@ protected Engine.Index indexDoc(IndexShard shard, String type, String id, String final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), Versions.MATCH_ANY, VersionType.INTERNAL, @@ -492,7 +492,7 @@ protected Engine.Index indexDoc(IndexShard shard, String type, String id, String false); } else { index = shard.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); }