From 736be72eaa7db4cf58a1ee82cb9f5252af20bb3a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 20 Apr 2017 20:59:10 +0200 Subject: [PATCH 1/5] Fill missing sequence IDs up to max sequence ID when recovering from store Today we might promote a primary and recover from store where after translog recovery the local checkpoint is still behind the maximum sequence ID seen. To fill the holes in the sequence ID history this PR adds a utility method that fills up all missing sequence IDs up to the maximum seen sequence ID with no-ops. --- .../elasticsearch/index/engine/Engine.java | 8 +++ .../index/engine/InternalEngine.java | 22 ++++++ .../index/shard/StoreRecovery.java | 2 + .../index/engine/InternalEngineTests.java | 70 ++++++++++++++++++- 4 files changed, 99 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 122587949e319..755d9db68b027 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1415,6 +1415,14 @@ public interface Warmer { */ public abstract void deactivateThrottling(); + /** + * Fills up the local checkpoints history with no-ops until the local checkpoint + * and the max seen sequence ID are identical. + * @param primaryTerm the shards primary term this engine was created for + * @return the number of no-ops added + */ + public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException; + /** * Performs recovery from the transaction log. * This operation will close the engine if the recovery fails. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e5d3453cacf9..a4110c25ab11e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -228,6 +228,27 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { logger.trace("created new InternalEngine"); } + @Override + public int fillSequenceNumberHistory(long primaryTerm) throws IOException { + try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final long maxSeqId = seqNoService.getMaxSeqNo(); + int numNoOpsAdded = 0; + for (long i = localCheckpoint + 1; i <= maxSeqId; + // the local checkpoint might have been advanced so we are leap-frogging + // to the next seq ID we need to process and create a noop for + i = Math.max(seqNoService.getLocalCheckpoint(), i) + 1) { + final NoOp noOp = new NoOp(null, i, primaryTerm, 0, VersionType.INTERNAL, Operation.Origin.PRIMARY, System.nanoTime(), + "filling up seqNo history"); + innerNoOp(noOp); + numNoOpsAdded++; + + } + return numNoOpsAdded; + } + } + private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; for (Map.Entry entry : writer.getLiveCommitData()) { @@ -1074,6 +1095,7 @@ public NoOpResult noOp(final NoOp noOp) { } private NoOpResult innerNoOp(final NoOp noOp) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); try { diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 6cfaca8c45b4b..8ea84e572e4bf 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -365,6 +365,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe logger.debug("failed to list file details", e); } indexShard.performTranslogRecovery(indexShouldExists); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fe9e75f304ac7..42049b21542e8 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -114,6 +114,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -191,7 +192,7 @@ public class InternalEngineTests extends ESTestCase { - protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1); + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; @@ -1961,7 +1962,7 @@ private static class MockAppender extends AbstractAppender { @Override public void append(LogEvent event) { final String formattedMessage = event.getMessage().getFormattedMessage(); - if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][1] ")) { + if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0] ")) { if (event.getLoggerName().endsWith(".IW") && formattedMessage.contains("IW: apply all deletes during flush")) { sawIndexWriterMessage = true; @@ -2341,7 +2342,7 @@ private Engine.Index indexForDoc(ParsedDocument doc) { private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } @@ -3853,4 +3854,67 @@ private Tuple getSequenceID(Engine engine, Engine.Get get) throws En } } + public void testFillUpSequenceIdGapsOnRecovery() throws IOException { + final int docs = randomIntBetween(1, 32); + int numDocsOnReplica = 0; + long maxSeqIDOnReplica = -1; + long checkpointOnReplica; + try { + for (int i = 0; i < docs; i++) { + final String docId = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + Engine.IndexResult indexResult = engine.index(primaryResponse); + if (randomBoolean()) { + doc.updateSeqID(indexResult.getSeqNo(), 1); + numDocsOnReplica++; + maxSeqIDOnReplica = indexResult.getSeqNo(); + replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); + } + } + checkpointOnReplica = replicaEngine.seqNoService().getLocalCheckpoint(); + } finally { + IOUtils.close(replicaEngine); + } + + + boolean flushed = false; + Engine recoveringEngine = null; + try { + assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); + assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); + assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations()); + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + if ((flushed = randomBoolean())) { + recoveringEngine.flush(true, true); + } + } finally { + IOUtils.close(recoveringEngine); + } + + // now do it again to make sure we preserve values etc. + try { + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + if (flushed) { + assertEquals(0, recoveringEngine.getTranslog().totalOperations()); + } + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + } finally { + IOUtils.close(recoveringEngine); + } + } } From 4f8b4c5bdc201d91d7ae0508cada400260c9d057 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 21 Apr 2017 14:19:28 +0200 Subject: [PATCH 2/5] fix compile errors --- .../org/elasticsearch/index/engine/InternalEngine.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a4110c25ab11e..16dd75030d01b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -235,12 +235,11 @@ public int fillSequenceNumberHistory(long primaryTerm) throws IOException { final long localCheckpoint = seqNoService.getLocalCheckpoint(); final long maxSeqId = seqNoService.getMaxSeqNo(); int numNoOpsAdded = 0; - for (long i = localCheckpoint + 1; i <= maxSeqId; + for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; // the local checkpoint might have been advanced so we are leap-frogging // to the next seq ID we need to process and create a noop for - i = Math.max(seqNoService.getLocalCheckpoint(), i) + 1) { - final NoOp noOp = new NoOp(null, i, primaryTerm, 0, VersionType.INTERNAL, Operation.Origin.PRIMARY, System.nanoTime(), - "filling up seqNo history"); + seqNo = Math.max(seqNoService.getLocalCheckpoint(), seqNo) + 1) { + final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); innerNoOp(noOp); numNoOpsAdded++; From 09fff064b23f3904f84675bbee17a7b5a638a043 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 21 Apr 2017 14:58:41 +0200 Subject: [PATCH 3/5] apply feedback --- .../elasticsearch/index/engine/InternalEngine.java | 4 +++- .../index/engine/InternalEngineTests.java | 14 +++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 16dd75030d01b..583f916001d7a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -238,10 +238,12 @@ public int fillSequenceNumberHistory(long primaryTerm) throws IOException { for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; // the local checkpoint might have been advanced so we are leap-frogging // to the next seq ID we need to process and create a noop for - seqNo = Math.max(seqNoService.getLocalCheckpoint(), seqNo) + 1) { + seqNo = seqNoService.getLocalCheckpoint()+1) { final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); innerNoOp(noOp); numNoOpsAdded++; + assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:" + + seqNoService.getLocalCheckpoint(); } return numNoOpsAdded; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 42049b21542e8..9013bc1898d15 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3867,7 +3867,6 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { Engine.Index primaryResponse = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(primaryResponse); if (randomBoolean()) { - doc.updateSeqID(indexResult.getSeqNo(), 1); numDocsOnReplica++; maxSeqIDOnReplica = indexResult.getSeqNo(); replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); @@ -3892,6 +3891,19 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + + // now snapshot the tlog and ensure the primary term is updated + Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); + assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); + Translog.Operation operation; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + assertEquals(2, operation.primaryTerm()); + } else { + assertEquals(1, operation.primaryTerm()); + } + + } assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); if ((flushed = randomBoolean())) { From 28edc5cda55dbe4071d8586529264dd9f085103c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 21 Apr 2017 17:26:56 +0200 Subject: [PATCH 4/5] add a test to check if we fill up on store recovery --- .../index/shard/IndexShardTests.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 629a8af3e0d3c..2523bbe68beb6 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -896,6 +898,46 @@ public void testRecoverFromStore() throws IOException { closeShards(newShard); } + /* This test just verifies that we fill up local checkpoint up to max seen seqID on primary recovery */ + public void testRecoverFromStoreWithNoOps() throws IOException { + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); + Engine.Index test = indexDoc(shard, "test", "1"); + // start a replica shard and index the second doc + final IndexShard otherShard = newStartedShard(false); + test = otherShard.prepareIndexOnReplica( + SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), test.type(), test.id(), test.source(), + XContentType.JSON), + 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + otherShard.index(test); + + final ShardRouting primaryShardRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot(); + Translog.Operation operation; + int numNoops = 0; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + numNoops++; + assertEquals(1, operation.primaryTerm()); + assertEquals(0, operation.seqNo()); + } + } + assertEquals(1, numNoops); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 1); + assertDocCount(shard, 2); + closeShards(newShard, shard); + } + public void testRecoverFromCleanStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0"); From 1beb75761c1d7f3aaf550a5126b9895f94acf55f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 21 Apr 2017 17:43:57 +0200 Subject: [PATCH 5/5] apply feedback --- .../action/bulk/TransportShardBulkAction.java | 6 +++--- .../index/mapper/SourceToParse.java | 18 +++--------------- .../index/shard/IndexShardTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 4 ++-- 5 files changed, 10 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 30f38230bc94f..21dd799122e39 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -415,7 +415,7 @@ public enum ReplicaItemExecutionMode { static { assert Version.CURRENT.minimumCompatibilityVersion().after(Version.V_5_0_0) == false: "Remove logic handling NoOp result from primary response; see TODO in replicaItemExecutionMode" + - " as the current minimum compatible version [" + + " as the current minimum compatible version [" + Version.CURRENT.minimumCompatibilityVersion() + "] is after 5.0"; } @@ -565,7 +565,7 @@ static Engine.Index prepareIndexOperationOnReplica( final long version = primaryResponse.getVersion(); final long seqNo = primaryResponse.getSeqNo(); final SourceToParse sourceToParse = - SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), + SourceToParse.source(shardId.getIndexName(), request.type(), request.id(), request.source(), request.getContentType()) .routing(request.routing()).parent(request.parent()); final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery(); @@ -578,7 +578,7 @@ static Engine.Index prepareIndexOperationOnReplica( /** Utility method to prepare an index operation on primary shards */ private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { final SourceToParse sourceToParse = - SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), + SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType()) .routing(request.routing()).parent(request.parent()); return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index a8a983ecde832..52e3001da84cd 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -23,22 +23,15 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; public class SourceToParse { - public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return source(Origin.PRIMARY, index, type, id, source, contentType); - } - - public static SourceToParse source(Origin origin, String index, String type, String id, BytesReference source, + public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return new SourceToParse(origin, index, type, id, source, contentType); + return new SourceToParse(index, type, id, source, contentType); } - private final Origin origin; - private final BytesReference source; private final String index; @@ -53,8 +46,7 @@ public static SourceToParse source(Origin origin, String index, String type, Str private XContentType xContentType; - private SourceToParse(Origin origin, String index, String type, String id, BytesReference source, XContentType xContentType) { - this.origin = Objects.requireNonNull(origin); + private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType) { this.index = Objects.requireNonNull(index); this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); @@ -64,10 +56,6 @@ private SourceToParse(Origin origin, String index, String type, String id, Bytes this.xContentType = Objects.requireNonNull(xContentType); } - public Origin origin() { - return origin; - } - public BytesReference source() { return this.source; } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2523bbe68beb6..dfeb081152592 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -906,7 +906,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { // start a replica shard and index the second doc final IndexShard otherShard = newStartedShard(false); test = otherShard.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), test.type(), test.id(), test.source(), + SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), XContentType.JSON), 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); otherShard.index(test); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bbe4d8ed12e80..261e53064fe01 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -44,7 +44,7 @@ public void testGetStartingSeqNo() throws Exception { long seqNo = 0; for (int i = 0; i < docs; i++) { Engine.Index indexOp = replica.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.REPLICA, index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), + SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); replica.index(indexOp); if (rarely()) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4062666ddbbf8..65576dcf0a2b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -484,7 +484,7 @@ protected Engine.Index indexDoc(IndexShard shard, String type, String id, String final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), Versions.MATCH_ANY, VersionType.INTERNAL, @@ -492,7 +492,7 @@ protected Engine.Index indexDoc(IndexShard shard, String type, String id, String false); } else { index = shard.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); }