diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 91fac0908ef7f..87b1e90a4d517 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,7 +93,7 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 3, + "generation" : 4, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 4fe50b983dfac..64760629bfd24 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -24,7 +24,6 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -40,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java deleted file mode 100644 index f7f3aa8e9fe1d..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.store.Directory; -import org.elasticsearch.Assertions; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened. - */ -public abstract class EngineDiskUtils { - - /** - * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. - */ - public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException { - try (IndexWriter writer = newIndexWriter(true, dir)) { - final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); - map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); - map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1"); - updateCommitData(writer, map); - } - } - - - /** - * Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file. - * This is used to make sure no existing shard will recovery from this index using ops based recovery. - */ - public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId) - throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { - final Map userData = getUserData(writer); - final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); - updateCommitData(writer, map); - } - } - - /** - * Creates a new empty translog and associates it with an existing lucene index. - */ - public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId) - throws IOException { - if (Assertions.ENABLED) { - final List existingCommits = DirectoryReader.listCommits(dir); - assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]"; - SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0)); - assert commitInfo.localCheckpoint >= initialGlobalCheckpoint : - "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" - + initialGlobalCheckpoint + "]"; - } - - try (IndexWriter writer = newIndexWriter(false, dir)) { - final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - updateCommitData(writer, map); - } - } - - - /** - * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. - */ - public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { - final Map userData = getUserData(writer); - if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { - updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); - } - } - } - - private static void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { - final Map userData = getUserData(writer); - userData.putAll(keysToUpdate); - writer.setLiveCommitData(userData.entrySet()); - writer.commit(); - } - - private static Map getUserData(IndexWriter writer) { - final Map userData = new HashMap<>(); - writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); - return userData; - } - - private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException { - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); - return new IndexWriter(dir, iwc); - } -} diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 224ae60a420d1..3654aeba2bf8d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,13 +40,13 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -390,7 +390,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe recoveryState.getIndex().updateVersion(version); if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { assert indexShouldExists; - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); + store.bootstrapNewHistory(); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { // since we recover from local, just fill the files and size try { @@ -402,7 +406,10 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe logger.debug("failed to list file details", e); } } else { - EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), + SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); } indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); @@ -445,8 +452,12 @@ private void restore(final IndexShard indexShard, final Repository repository, f } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(), - shardId); + final Store store = indexShard.store(); + store.bootstrapNewHistory(); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index e560a0b040b0b..297790890c1b0 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -30,6 +30,8 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; @@ -46,7 +48,6 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.Version; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -69,11 +70,13 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; @@ -155,7 +158,8 @@ public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService dire this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY); } - public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException { + public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, + OnClose onClose) throws IOException { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId)); @@ -1454,4 +1458,100 @@ private static long estimateSize(Directory directory) throws IOException { } } + /** + * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. + */ + public void createEmpty() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) { + final Map map = new HashMap<>(); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1"); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + + /** + * Marks an existing lucene index with a new history uuid. + * This is used to make sure no existing shard will recovery from this index using ops based recovery. + */ + public void bootstrapNewHistory() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + final Map userData = getUserData(writer); + final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + final Map map = new HashMap<>(); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + /** + * Force bakes the given translog generation as recovery information in the lucene index. This is + * used when recovering from a snapshot or peer file based recovery where a new empty translog is + * created and the existing lucene index needs should be changed to use it. + */ + public void associateIndexWithNewTranslog(final String translogUUID) throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { + throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); + } + final Map map = new HashMap<>(); + map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); + map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + + /** + * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. + */ + public void ensureIndexHasHistoryUUID() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + final Map userData = getUserData(writer); + if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { + updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { + final Map userData = getUserData(writer); + userData.putAll(keysToUpdate); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + + private Map getUserData(IndexWriter writer) { + final Map userData = new HashMap<>(); + writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + return userData; + } + + private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException { + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(openMode); + return new IndexWriter(dir, iwc); + } + } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b28e992d9fd5d..91d3332f8e646 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -439,11 +438,12 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { - EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); + store.ensureIndexHasHistoryUUID(); } // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 - EngineDiskUtils.createNewTranslog(store.directory(), indexShard.shardPath().resolveTranslog(), - SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = + Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java deleted file mode 100644 index aca94708af9f8..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.search.IndexSearcher; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.IndexSettingsModule; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; - -public class EngineDiskUtilsTests extends EngineTestCase { - - - public void testHistoryUUIDIsSetIfMissing() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - engine.close(); - - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { - Map newCommitData = new HashMap<>(); - for (Map.Entry entry : writer.getLiveCommitData()) { - if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { - newCommitData.put(entry.getKey(), entry.getValue()); - } - } - writer.setLiveCommitData(newCommitData.entrySet()); - writer.commit(); - } - - EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); - - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) - .build()); - - EngineConfig config = engine.config(); - EngineConfig newConfig = new EngineConfig( - shardId, allocationId.getId(), - threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); - engine = new InternalEngine(newConfig); - engine.recoverFromTranslog(); - assertVisibleCount(engine, numDocs, false); - assertThat(engine.getHistoryUUID(), notNullValue()); - } - - public void testCurrentTranslogIDisCommitted() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - - // create - { - EngineDiskUtils.createEmpty(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); - ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - - try (InternalEngine engine = createEngine(config)) { - engine.index(firstIndexRequest); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - // open and recover tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(config)) { - assertTrue(engine.isRecovering()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - // creating an empty index will create the first translog gen and commit it - // opening the empty index will make the second translog file but not commit it - // opening the engine again (i=0) will make the third translog file, which then be committed - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - // open index with new tlog - { - EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), - SequenceNumbers.NO_OPS_PERFORMED, shardId); - try (InternalEngine engine = new InternalEngine(config)) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); - } - } - - // open and recover tlog with empty tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(config)) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - } - } - - public void testHistoryUUIDCanBeForced() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - final String oldHistoryUUID = engine.getHistoryUUID(); - engine.close(); - EngineConfig config = engine.config(); - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); - - EngineConfig newConfig = new EngineConfig( - shardId, allocationId.getId(), - threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); - engine = new InternalEngine(newConfig); - engine.recoverFromTranslog(); - assertVisibleCount(engine, 0, false); - assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); - } -} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bc4ecbee4d6a8..c48a9ee8a2d19 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -65,7 +65,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -91,6 +90,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -1141,8 +1141,9 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { engine.flushAndClose(); } if (randomBoolean()) { - EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), + final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); engine.recoverFromTranslog(); @@ -2354,6 +2355,84 @@ public void testSettings() { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } + public void testCurrentTranslogIDisCommitted() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + + // create + { + store.createEmpty(); + final String translogUUID = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); + ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + + try (InternalEngine engine = createEngine(config)) { + engine.index(firstIndexRequest); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + // open and recover tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + assertTrue(engine.isRecovering()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + if (i == 0) { + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } else { + // creating an empty index will create the first translog gen and commit it + // opening the empty index will make the second translog file but not commit it + // opening the engine again (i=0) will make the third translog file, which then be committed + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + // open index with new tlog + { + final String translogUUID = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + assertEquals(2, engine.getTranslog().currentFileGeneration()); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); + } + } + + // open and recover tlog with empty tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + } + } + public void testMissingTranslog() throws IOException { // test that we can force start the engine , even if the translog is missing. engine.close(); @@ -2369,7 +2448,8 @@ public void testMissingTranslog() throws IOException { // expected } // when a new translog is created it should be ok - EngineDiskUtils.createNewTranslog(store.directory(), primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); engine = new InternalEngine(config); } @@ -2432,7 +2512,9 @@ public void testTranslogCleanUpPostCommitCrash() throws Exception { final Path translogPath = createTempDir(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); - EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)) { @@ -3223,7 +3305,8 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { } try (Store store = createStore(newFSDirectory(storeDir))) { if (randomBoolean() || true) { - EngineDiskUtils.createNewTranslog(store.directory(), translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); } try (Engine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); @@ -4025,10 +4108,12 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { final Path translogPath = createTempDir(); store = createStore(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + store.associateIndexWithNewTranslog(translogUUID); final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); - EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); try (Engine engine = new InternalEngine(engineConfig) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { @@ -4042,7 +4127,6 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s }) { engine.recoverFromTranslog(); int numDocs = scaledRandomIntBetween(10, 100); - final String translogUUID = engine.getTranslog().getTranslogUUID(); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f05fdc60c5cf7..822294a9c19f7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -29,7 +29,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -70,6 +69,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -1059,27 +1059,27 @@ public void testSnapshotStore() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); assertTrue(newShard.recoverFromStore()); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); newShard.close("test", false); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); closeShards(newShard); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 1f9c5ae6df359..0609477dda8e5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -26,7 +26,6 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -37,12 +36,12 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -52,6 +51,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; @@ -121,7 +121,10 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; - EngineDiskUtils.createEmpty(store.directory(), translogConfig.getTranslogPath(), shardId); + store.createEmpty(); + final String translogUUID = + Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 392227396de15..9352d978e6e46 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -48,7 +48,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; @@ -59,6 +58,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -93,7 +93,9 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class StoreTests extends ESTestCase { @@ -761,7 +763,8 @@ public void testStoreStats() throws IOException { Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build(); - Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, new DummyShardLock(shardId)); + Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, + new DummyShardLock(shardId)); long initialStoreSize = 0; for (String extraFiles : store.directory().listAll()) { assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra")); @@ -1071,4 +1074,55 @@ public Directory newDirectory() throws IOException { store.close(); } + public void testEnsureIndexHasHistoryUUID() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) { + + store.createEmpty(); + + // remove the history uuid + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { + Map newCommitData = new HashMap<>(); + for (Map.Entry entry : writer.getLiveCommitData()) { + if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { + newCommitData.put(entry.getKey(), entry.getValue()); + } + } + writer.setLiveCommitData(newCommitData.entrySet()); + writer.commit(); + } + + store.ensureIndexHasHistoryUUID(); + + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + } + } + + public void testHistoryUUIDCanBeForced() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) { + + store.createEmpty(); + + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + final String oldHistoryUUID = segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY); + + store.bootstrapNewHistory(); + + segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + assertThat(segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY), not(equalTo(oldHistoryUUID))); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index d1dbaf6bc89fe..661a1f0635430 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -32,7 +32,6 @@ import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -48,6 +47,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index c75e469f7aff4..f0e46cf022344 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -37,8 +37,6 @@ import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -51,6 +49,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -363,9 +362,14 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact @Nullable BiFunction localCheckpointTrackerSupplier, @Nullable ToLongBiFunction seqNoForOperation, EngineConfig config) throws IOException { - final Directory directory = config.getStore().directory(); + final Store store = config.getStore(); + final Directory directory = store.directory(); if (Lucene.indexExists(directory) == false) { - EngineDiskUtils.createEmpty(directory, config.getTranslogConfig().getTranslogPath(), config.getShardId()); + store.createEmpty(); + final String translogUuid = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUuid); + } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); internalEngine.recoverFromTranslog(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index b5ea5fd4c0eab..6d6cc36d78b1b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -25,7 +25,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -46,6 +45,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings;