Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -1291,37 +1290,46 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
}
}

// pkg-private for testing
IndexWriter createWriter(boolean create) throws IOException {
private IndexWriter createWriter(boolean create) throws IOException {
try {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return new IndexWriter(store.directory(), iwc);
final IndexWriterConfig iwc = getIndexWriterConfig(create);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
throw ex;
}
}

// pkg-private for testing
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return new IndexWriter(directory, iwc);
}

private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return iwc;
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
Expand Down Expand Up @@ -129,7 +128,6 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.OldIndexUtils;
import org.elasticsearch.test.rest.yaml.section.Assertion;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
Expand All @@ -138,7 +136,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
Expand All @@ -152,7 +149,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -333,35 +329,50 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store,
return createEngine(indexSettings, store, translogPath, mergePolicy, null);

}
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier<IndexWriter> indexWriterSupplier) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterSupplier, null);
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
@Nullable IndexWriterFactory indexWriterFactory) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null);
}

protected InternalEngine createEngine(
IndexSettings indexSettings,
Store store,
Path translogPath,
MergePolicy mergePolicy,
Supplier<IndexWriter> indexWriterSupplier,
Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
InternalEngine internalEngine = new InternalEngine(config) {
@Override
IndexWriter createWriter(boolean create) throws IOException {
return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create);
}

@Override
public SequenceNumbersService seqNoService() {
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService();
}
};
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
return internalEngine;
}

@FunctionalInterface
public interface IndexWriterFactory {

IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException;
}

public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
@Nullable final Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier,
final EngineConfig config) {
return new InternalEngine(config) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
indexWriterFactory.createWriter(directory, iwc) :
super.createWriter(directory, iwc);
}

@Override
public SequenceNumbersService seqNoService() {
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService();
}
};
}

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
Expand Down Expand Up @@ -2589,36 +2600,41 @@ public void testHandleDocumentFailure() throws Exception {
final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null);
final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null);

ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig());
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) {
AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
(directory, iwc) -> {
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get();
})
) {
// test document failure while indexing
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
} else {
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
}
Engine.IndexResult indexResult = engine.index(indexForDoc(doc1));
assertNotNull(indexResult.getFailure());

throwingIndexWriter.clearFailure();
throwingIndexWriter.get().clearFailure();
indexResult = engine.index(indexForDoc(doc1));
assertNull(indexResult.getFailure());
engine.index(indexForDoc(doc2));

// test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
} else {
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
}

// test non document level failure is thrown
if (randomBoolean()) {
// simulate close by corruption
throwingIndexWriter.setThrowFailure(null);
throwingIndexWriter.get().setThrowFailure(null);
UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> {
Engine.Index index = indexForDoc(doc3);
index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -82,6 +83,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");

protected ReplicationGroup createGroup(int replicas) throws IOException {
IndexMetaData metaData = buildIndexMetaData(replicas);
return new ReplicationGroup(metaData);
}

protected IndexMetaData buildIndexMetaData(int replicas) throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
Expand All @@ -92,7 +98,7 @@ protected ReplicationGroup createGroup(int replicas) throws IOException {
for (Map.Entry<String, String> typeMapping : indexMapping.entrySet()) {
metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
}
return new ReplicationGroup(metaData.build());
return metaData.build();
}

protected DiscoveryNode getDiscoveryNode(String id) {
Expand All @@ -109,7 +115,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
boolean closed = false;

ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
primary = newShard(shardId, true, "s0", indexMetaData, this::syncGlobalCheckpoint, null);
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
primary = newShard(primaryRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(primaryRouting));
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
updateAllocationIDsOnPrimary();
Expand All @@ -118,6 +125,15 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
}
}

private ShardRouting createShardRouting(String nodeId, boolean primary) {
return TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
}

protected EngineFactory getEngineFactory(ShardRouting routing) {
return null;
}

public int indexDocs(final int numOfDoc) throws Exception {
for (int doc = 0; doc < numOfDoc; doc++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet()))
Expand Down Expand Up @@ -175,8 +191,9 @@ public void startPrimary() throws IOException {
}

public synchronized IndexShard addReplica() throws IOException {
final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false);
final IndexShard replica =
newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, this::syncGlobalCheckpoint, null);
newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting));
replicas.add(replica);
updateAllocationIDsOnPrimary();
return replica;
Expand All @@ -189,7 +206,8 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);

final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, this::syncGlobalCheckpoint);
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
this::syncGlobalCheckpoint, getEngineFactory(shardRouting));
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
Expand Down Expand Up @@ -531,13 +549,15 @@ class GlobalCheckpointSync extends ReplicationAction<GlobalCheckpointSyncAction.
@Override
protected PrimaryResult performOnPrimary(IndexShard primary,
GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception {
primary.getTranslog().sync();
return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()),
new ReplicationResponse());
}

@Override
protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) {
protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) throws IOException {
replica.updateGlobalCheckpointOnReplica(request.getCheckpoint());
replica.getTranslog().sync();
}
}

Expand Down
Loading