diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8b9cc8564d17a..bcfee5026ce9a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -87,7 +87,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.function.Supplier; public class InternalEngine extends Engine { @@ -1291,37 +1290,46 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException { } } - // pkg-private for testing - IndexWriter createWriter(boolean create) throws IOException { + private IndexWriter createWriter(boolean create) throws IOException { try { - final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); - iwc.setCommitOnClose(false); // we by default don't commit on close - iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexDeletionPolicy(deletionPolicy); - // with tests.verbose, lucene sets this up: plumb to align with filesystem stream - boolean verbose = false; - try { - verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); - } catch (Exception ignore) { - } - iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); - iwc.setMergeScheduler(mergeScheduler); - MergePolicy mergePolicy = config().getMergePolicy(); - // Give us the opportunity to upgrade old segments while performing - // background merges - mergePolicy = new ElasticsearchMergePolicy(mergePolicy); - iwc.setMergePolicy(mergePolicy); - iwc.setSimilarity(engineConfig.getSimilarity()); - iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); - iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh - return new IndexWriter(store.directory(), iwc); + final IndexWriterConfig iwc = getIndexWriterConfig(create); + return createWriter(store.directory(), iwc); } catch (LockObtainFailedException ex) { logger.warn("could not lock IndexWriter", ex); throw ex; } } + // pkg-private for testing + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return new IndexWriter(directory, iwc); + } + + private IndexWriterConfig getIndexWriterConfig(boolean create) { + final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); + iwc.setCommitOnClose(false); // we by default don't commit on close + iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); + iwc.setIndexDeletionPolicy(deletionPolicy); + // with tests.verbose, lucene sets this up: plumb to align with filesystem stream + boolean verbose = false; + try { + verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); + } catch (Exception ignore) { + } + iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); + iwc.setMergeScheduler(mergeScheduler); + MergePolicy mergePolicy = config().getMergePolicy(); + // Give us the opportunity to upgrade old segments while performing + // background merges + mergePolicy = new ElasticsearchMergePolicy(mergePolicy); + iwc.setMergePolicy(mergePolicy); + iwc.setSimilarity(engineConfig.getSimilarity()); + iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); + iwc.setCodec(engineConfig.getCodec()); + iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh + return iwc; + } + /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */ static final class SearchFactory extends EngineSearcherFactory { private final Engine.Warmer warmer; diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8de796a7612a0..5b4a6e10ddaf2 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -34,7 +34,6 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; @@ -129,7 +128,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.OldIndexUtils; -import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -138,7 +136,6 @@ import java.io.IOException; import java.io.InputStream; -import java.io.Reader; import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.DirectoryStream; @@ -152,7 +149,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -333,8 +329,9 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store, return createEngine(indexSettings, store, translogPath, mergePolicy, null); } - protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier indexWriterSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterSupplier, null); + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); } protected InternalEngine createEngine( @@ -342,26 +339,40 @@ protected InternalEngine createEngine( Store store, Path translogPath, MergePolicy mergePolicy, - Supplier indexWriterSupplier, - Supplier sequenceNumbersServiceSupplier) throws IOException { + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable Supplier sequenceNumbersServiceSupplier) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); - InternalEngine internalEngine = new InternalEngine(config) { - @Override - IndexWriter createWriter(boolean create) throws IOException { - return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create); - } - - @Override - public SequenceNumbersService seqNoService() { - return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService(); - } - }; + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); } return internalEngine; } + @FunctionalInterface + public interface IndexWriterFactory { + + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; + } + + public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, + @Nullable final Supplier sequenceNumbersServiceSupplier, + final EngineConfig config) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + public SequenceNumbersService seqNoService() { + return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService(); + } + }; + } + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), @@ -2589,18 +2600,23 @@ public void testHandleDocumentFailure() throws Exception { final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null); - ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig()); - try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) { + AtomicReference throwingIndexWriter = new AtomicReference<>(); + try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, + (directory, iwc) -> { + throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc)); + return throwingIndexWriter.get(); + }) + ) { // test document failure while indexing if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); + throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); } else { - throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); - throwingIndexWriter.clearFailure(); + throwingIndexWriter.get().clearFailure(); indexResult = engine.index(indexForDoc(doc1)); assertNull(indexResult.getFailure()); engine.index(indexForDoc(doc2)); @@ -2608,17 +2624,17 @@ public void testHandleDocumentFailure() throws Exception { // test failure while deleting // all these simulated exceptions are not fatal to the IW so we treat them as document failures if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); + throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } else { - throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } // test non document level failure is thrown if (randomBoolean()) { // simulate close by corruption - throwingIndexWriter.setThrowFailure(null); + throwingIndexWriter.get().setThrowFailure(null); UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> { Engine.Index index = indexForDoc(doc3); index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 4477320b95680..681a3758ea77d 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.IndexShard; @@ -82,6 +83,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected ReplicationGroup createGroup(int replicas) throws IOException { + IndexMetaData metaData = buildIndexMetaData(replicas); + return new ReplicationGroup(metaData); + } + + protected IndexMetaData buildIndexMetaData(int replicas) throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -92,7 +98,7 @@ protected ReplicationGroup createGroup(int replicas) throws IOException { for (Map.Entry typeMapping : indexMapping.entrySet()) { metaData.putMapping(typeMapping.getKey(), typeMapping.getValue()); } - return new ReplicationGroup(metaData.build()); + return metaData.build(); } protected DiscoveryNode getDiscoveryNode(String id) { @@ -109,7 +115,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable boolean closed = false; ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { - primary = newShard(shardId, true, "s0", indexMetaData, this::syncGlobalCheckpoint, null); + final ShardRouting primaryRouting = this.createShardRouting("s0", true); + primary = newShard(primaryRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(primaryRouting)); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -118,6 +125,15 @@ protected class ReplicationGroup implements AutoCloseable, Iterable } } + private ShardRouting createShardRouting(String nodeId, boolean primary) { + return TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + } + + protected EngineFactory getEngineFactory(ShardRouting routing) { + return null; + } + public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) @@ -175,8 +191,9 @@ public void startPrimary() throws IOException { } public synchronized IndexShard addReplica() throws IOException { + final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = - newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, this::syncGlobalCheckpoint, null); + newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting)); replicas.add(replica); updateAllocationIDsOnPrimary(); return replica; @@ -189,7 +206,8 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP false, ShardRoutingState.INITIALIZING, RecoverySource.PeerRecoverySource.INSTANCE); - final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, this::syncGlobalCheckpoint); + final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, + this::syncGlobalCheckpoint, getEngineFactory(shardRouting)); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; @@ -531,13 +549,15 @@ class GlobalCheckpointSync extends ReplicationAction blockIndexingOnPrimary = new AtomicReference<>(); + final CountDownLatch blockedIndexers = new CountDownLatch(pendingDocs); + + try (ReplicationGroup shards = new ReplicationGroup(metaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + if (routing.primary()) { + return new EngineFactory() { + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return InternalEngineTests.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + Semaphore block = blockIndexingOnPrimary.get(); + if (block != null) { + blockedIndexers.countDown(); + try { + block.acquire(); + } catch (InterruptedException e) { + throw new AssertionError("unexpectedly interrupted", e); + } + } + return super.addDocument(doc); + } + + }, null, config); + } + + @Override + public Engine newReadOnlyEngine(EngineConfig config) { + throw new UnsupportedOperationException(); + } + }; + } else { + return null; + } + } + }) { + shards.startAll(); + int docs = shards.indexDocs(randomIntBetween(1,10)); + IndexShard replica = shards.getReplicas().get(0); + shards.removeReplica(replica); + closeShards(replica); + + docs += pendingDocs; + final Semaphore pendingDocsSemaphore = new Semaphore(pendingDocs); + blockIndexingOnPrimary.set(pendingDocsSemaphore); + blockIndexingOnPrimary.get().acquire(pendingDocs); + CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs); + for (int i = 0; i < pendingDocs; i++) { + final String id = "pending_" + i; + threadPool.generic().submit(() -> { + try { + shards.index(new IndexRequest(index.getName(), "type", id).source("{}")); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + pendingDocsDone.countDown(); + } + }); + } + + // wait for the pending ops to "hang" + blockedIndexers.await(); + + blockIndexingOnPrimary.set(null); + // index some more + docs += shards.indexDocs(randomInt(5)); + + IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + + CountDownLatch recoveryStart = new CountDownLatch(1); + AtomicBoolean preparedForTranslog = new AtomicBoolean(false); + final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { + recoveryStart.countDown(); + return new RecoveryTarget(indexShard, node, recoveryListener, l -> { + }) { + @Override + public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + preparedForTranslog.set(true); + super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + } + }; + }); + + recoveryStart.await(); + + for (int i = 0; i < pendingDocs; i++) { + assertFalse((pendingDocs - i) + " pending operations, recovery should wait", preparedForTranslog.get()); + pendingDocsSemaphore.release(); + } + + pendingDocsDone.await(); + + // now recovery can finish + recoveryFuture.get(); + + assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty()); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(docs)); + + shards.assertAllEqual(docs); + } + } + private static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8dec65bdae39b..b106a308098c9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1061,7 +1061,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { }; closeShards(shard); IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}, null); recoveryShardFromStore(newShard); @@ -1202,7 +1202,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}, null); recoveryShardFromStore(newShard); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/EvilPeerRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/EvilPeerRecoveryIT.java deleted file mode 100644 index 913742dcbf1aa..0000000000000 --- a/core/src/test/java/org/elasticsearch/indices/recovery/EvilPeerRecoveryIT.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.recovery; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.Tokenizer; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.analysis.AnalyzerProvider; -import org.elasticsearch.index.analysis.AnalyzerScope; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.analysis.AnalysisModule; -import org.elasticsearch.plugins.AnalysisPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.equalTo; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -public class EvilPeerRecoveryIT extends ESIntegTestCase { - - private static AtomicReference indexLatch = new AtomicReference<>(); - private static AtomicReference waitForOpsToCompleteLatch = new AtomicReference<>(); - - @Override - protected Collection> nodePlugins() { - return Collections.singletonList(LatchAnalysisPlugin.class); - } - - public static class LatchAnalysisPlugin extends Plugin implements AnalysisPlugin { - - @Override - public Map>> getAnalyzers() { - return Collections.singletonMap("latch_analyzer", (a, b, c, d) -> new LatchAnalyzerProvider()); - } - - } - - static class LatchAnalyzerProvider implements AnalyzerProvider { - - @Override - public String name() { - return "latch_analyzer"; - } - - @Override - public AnalyzerScope scope() { - return AnalyzerScope.INDICES; - } - - @Override - public LatchAnalyzer get() { - return new LatchAnalyzer(); - } - - } - - static class LatchAnalyzer extends Analyzer { - - @Override - protected TokenStreamComponents createComponents(final String fieldName) { - return new TokenStreamComponents(new LatchTokenizer()); - } - - } - - static class LatchTokenizer extends Tokenizer { - - @Override - public final boolean incrementToken() throws IOException { - try { - if (indexLatch.get() != null) { - // latch that all exected operations are in the engine - indexLatch.get().countDown(); - } - - if (waitForOpsToCompleteLatch.get() != null) { - // latch that waits for the replica to restart and allows recovery to proceed - waitForOpsToCompleteLatch.get().await(); - } - - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - return false; - } - - } - - /* - * This tests that sequence-number-based recoveries wait for in-flight operations to complete. The trick here is simple. We latch some - * in-flight operations inside the engine after sequence numbers are assigned. While these operations are latched, we restart a replica. - * Sequence-number-based recovery on this replica has to wait until these in-flight operations complete to proceed. We verify at the end - * of recovery that a file-based recovery was not completed, and that the expected number of operations was replayed via the translog. - */ - @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," + - "org.elasticsearch.discovery:TRACE," + - "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + - "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE") - @AwaitsFix(bugUrl = - "boaz is looking into failures: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+java9-periodic/1545") - public void testRecoveryWaitsForOps() throws Exception { - final int docs = randomIntBetween(1, 64); - try { - internalCluster().startMasterOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0)); - - // prepare mapping that uses our latch analyzer - final XContentBuilder mapping = jsonBuilder(); - mapping.startObject(); - { - mapping.startObject("type"); - { - mapping.startObject("properties"); - { - mapping.startObject("foo"); - { - mapping.field("type", "text"); - mapping.field("analyzer", "latch_analyzer"); - mapping.endObject(); - } - mapping.endObject(); - } - mapping.endObject(); - } - mapping.endObject(); - } - - // create the index with our mapping - client() - .admin() - .indices() - .prepareCreate("index") - .addMapping("type", mapping) - .setSettings(Settings.builder().put("number_of_shards", 1)) - .get(); - - // start the replica node; we do this after creating the index so we can control which node is holds the primary shard - final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1)); - ensureGreen(); - - // index some documents so that the replica will attempt a sequence-number-based recovery upon restart - for (int foo = 0; foo < docs; foo++) { - index(randomFrom(primaryNode, replicaNode), foo); - } - - if (randomBoolean()) { - client().admin().indices().flush(new FlushRequest()).get(); - } - - // start some in-flight operations that will get latched in the engine - final List threads = new ArrayList<>(); - final int latchedDocs = internalCluster().getInstance(ThreadPool.class, replicaNode).info(ThreadPool.Names.BULK).getMax(); - indexLatch.set(new CountDownLatch(latchedDocs)); - waitForOpsToCompleteLatch.set(new CountDownLatch(1)); - for (int i = docs; i < docs + latchedDocs; i++) { - final int foo = i; - // we have to index through the primary since we are going to restart the replica - final Thread thread = new Thread(() -> index(primaryNode, foo)); - threads.add(thread); - thread.start(); - } - - // latch until all operations are inside the engine - indexLatch.get().await(); - - internalCluster().restartNode(replicaNode, new InternalTestCluster.RestartCallback()); - - final Index index = resolveIndex("index"); - - // wait until recovery starts - assertBusy(() -> { - final IndicesService primaryService = internalCluster().getInstance(IndicesService.class, primaryNode); - assertThat(primaryService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1)); - final IndicesService replicaService = internalCluster().getInstance(IndicesService.class, replicaNode); - assertThat(replicaService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1)); - } - ); - - // unlatch the operations that are latched inside the engine - waitForOpsToCompleteLatch.get().countDown(); - - for (final Thread thread : threads) { - thread.join(); - } - - // recovery should complete successfully - ensureGreen(); - - // verify that a sequence-number-based recovery was completed - final org.elasticsearch.action.admin.indices.recovery.RecoveryResponse response = - client().admin().indices().prepareRecoveries("index").get(); - final List states = response.shardRecoveryStates().get("index"); - for (final RecoveryState state : states) { - if (state.getTargetNode().getName().equals(replicaNode)) { - assertThat(state.getTranslog().recoveredOperations(), equalTo(latchedDocs)); - assertThat(state.getIndex().recoveredFilesPercent(), equalTo(0f)); - } - } - } finally { - internalCluster().close(); - } - - } - - private void index(final String node, final int foo) { - client(node).prepareIndex("index", "type").setSource("{\"foo\":\"" + Integer.toString(foo) + "\"}").get(); - } - -} diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 20c869d38c6e9..01c142133a6e7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; @@ -190,7 +191,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, () -> {}); + return newShard(shardRouting, indexMetaData, searcherWrapper, () -> {}, null); } /** @@ -206,7 +207,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, globalCheckpointSyncer); + return newShard(shardRouting, indexMetaData, searcherWrapper, globalCheckpointSyncer, null); } @@ -220,7 +221,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, () -> {}, listeners); + return newShard(routing, indexMetaData, null, () -> {}, null, listeners); } /** @@ -235,13 +236,14 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, Runnable globalCheckpointSyncer, + @Nullable EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { // add node id as name to settings for popper logging final ShardId shardId = routing.shardId(); final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, globalCheckpointSyncer, listeners); + return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, globalCheckpointSyncer, engineFactory, listeners); } /** @@ -256,6 +258,7 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, Runnable globalCheckpointSyncer, + @Nullable EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); @@ -277,8 +280,8 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe IndexFieldDataService indexFieldDataService = new IndexFieldDataService(indexSettings, indicesFieldDataCache, new NoneCircuitBreakerService(), mapperService); indexShard = new IndexShard(routing, indexSettings, shardPath, store, indexCache, mapperService, similarityService, - indexFieldDataService, null, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, - globalCheckpointSyncer, Collections.emptyList(), Arrays.asList(listeners)); + indexFieldDataService, engineFactory, indexEventListener, indexSearcherWrapper, threadPool, + BigArrays.NON_RECYCLING_INSTANCE, warmer, globalCheckpointSyncer, Collections.emptyList(), Arrays.asList(listeners)); success = true; } finally { if (success == false) { @@ -309,7 +312,7 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { closeShards(current); return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, - current.getGlobalCheckpointSyncer(), listeners); + current.getGlobalCheckpointSyncer(), current.engineFactory, listeners); } /**