diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index feb48ef85d1ba..d15de54c54e99 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -397,9 +397,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe assert indexShouldExists; store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { @@ -466,9 +466,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f final Store store = indexShard.store(); store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 66e3e4d5558d8..73ac8a65d3007 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1428,26 +1428,27 @@ public void bootstrapNewHistory() throws IOException { try { Map userData = readLastCommittedSegmentsInfo().getUserData(); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - bootstrapNewHistory(maxSeqNo); + final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + bootstrapNewHistory(localCheckpoint, maxSeqNo); } finally { metadataLock.writeLock().unlock(); } } /** - * Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint + * Marks an existing lucene index with a new history uuid and sets the given local checkpoint * as well as the maximum sequence number. - * This is used to make sure no existing shard will recovery from this index using ops based recovery. + * This is used to make sure no existing shard will recover from this index using ops based recovery. * @see SequenceNumbers#LOCAL_CHECKPOINT_KEY * @see SequenceNumbers#MAX_SEQ_NO */ - public void bootstrapNewHistory(long maxSeqNo) throws IOException { + public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException { metadataLock.writeLock().lock(); try (IndexWriter writer = newAppendingIndexWriter(directory, null)) { final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); updateCommitData(writer, map); } finally { metadataLock.writeLock().unlock(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12a7fad466e29..2851f43b1b990 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; @@ -46,8 +45,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -62,7 +59,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -107,7 +103,6 @@ import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreUtils; @@ -121,12 +116,8 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; @@ -143,7 +134,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -174,7 +164,6 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -2159,9 +2148,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRestoreShard() throws IOException { final IndexShard source = newStartedShard(true); - IndexShard target = newStartedShard(true); + IndexShard target = newStartedShard(true, Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build()); indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history + indexDoc(source, "_doc", "2"); if (randomBoolean()) { source.refresh("test"); } @@ -2197,16 +2189,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio } } })); - assertThat(target.getLocalCheckpoint(), equalTo(0L)); - assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L)); - assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L)); + assertThat(target.getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( - target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L)); + target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L)); - assertDocs(target, "0"); + assertDocs(target, "0", "2"); - closeShards(source, target); + closeShard(source, false); + closeShards(target); } public void testSearcherWrapperIsUsed() throws IOException { @@ -3131,107 +3125,6 @@ private Result indexOnReplicaWithGaps( return new Result(localCheckpoint, max); } - /** A dummy repository for testing which just needs restore overridden */ - private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { - private final String indexName; - - RestoreOnlyRepository(String indexName) { - this.indexName = indexName; - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public RepositoryMetaData getMetadata() { - return null; - } - - @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - return null; - } - - @Override - public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { - return null; - } - - @Override - public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { - return null; - } - - @Override - public RepositoryData getRepositoryData() { - Map> map = new HashMap<>(); - map.put(new IndexId(indexName, "blah"), emptySet()); - return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList()); - } - - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - } - - @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState) { - return null; - } - - @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - } - - @Override - public long getSnapshotThrottleTimeInNanos() { - return 0; - } - - @Override - public long getRestoreThrottleTimeInNanos() { - return 0; - } - - @Override - public String startVerification() { - return null; - } - - @Override - public void endVerification(String verificationToken) { - } - - @Override - public boolean isReadOnly() { - return false; - } - - @Override - public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus) { - } - - @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { - return null; - } - - @Override - public void verify(String verificationToken, DiscoveryNode localNode) { - } - } - public void testIsSearchIdle() throws Exception { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java new file mode 100644 index 0000000000000..11bdfb7bcc741 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -0,0 +1,146 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; + +/** A dummy repository for testing which just needs restore overridden */ +public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { + private final String indexName; + + public RestoreOnlyRepository(String indexName) { + this.indexName = indexName; + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public RepositoryMetaData getMetadata() { + return null; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return null; + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return null; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return null; + } + + @Override + public RepositoryData getRepositoryData() { + Map> map = new HashMap<>(); + map.put(new IndexId(indexName, "blah"), emptySet()); + return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList()); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState) { + return null; + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + return null; + } + + @Override + public void endVerification(String verificationToken) { + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return null; + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index eabb05a537ca7..ed3a836d2c506 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable { private final Logger logger = LogManager.getLogger(getClass()); final Thread[] writers; + final Client client; final CountDownLatch stopLatch; final CopyOnWriteArrayList failures; final AtomicBoolean stop = new AtomicBoolean(false); @@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli if (random == null) { random = RandomizedTest.getRandom(); } + this.client = client; useAutoGeneratedIDs = random.nextBoolean(); failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; @@ -316,6 +318,10 @@ public void close() throws Exception { stop(); } + public Client getClient() { + return client; + } + /** * Returns the ID set of all documents indexed by this indexer run */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 84250baaeaa21..aa94071ac1d35 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -218,21 +218,17 @@ private void initiateFollowing( final PutFollowAction.Request request, final ActionListener listener) { assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; - activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, - request.waitForActiveShards(), request.timeout(), result -> { - if (result) { - FollowParameters parameters = request.getParameters(); - ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); - resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); - resumeFollowRequest.setParameters(new FollowParameters(parameters)); - client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( - r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), - listener::onFailure - )); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, listener::onFailure); + FollowParameters parameters = request.getParameters(); + ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); + resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); + resumeFollowRequest.setParameters(new FollowParameters(parameters)); + client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( + r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, + request.waitForActiveShards(), request.timeout(), result -> + listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())), + listener::onFailure), + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 48ea50af9990b..4594712dcda94 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -58,6 +59,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -94,6 +96,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; @@ -555,6 +559,70 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol }); } + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException { + // indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED. + return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer); + } + + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param maxWaitTime if not progress have been made during this time, fail the test + * @param maxWaitTimeUnit the unit in which maxWaitTime is specified + * @param indexer Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer) + throws InterruptedException { + final AtomicLong lastKnownCount = new AtomicLong(-1); + long lastStartCount = -1; + BooleanSupplier testDocs = () -> { + lastKnownCount.set(indexer.totalIndexedDocs()); + if (lastKnownCount.get() >= numDocs) { + try { + long count = indexer.getClient().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(QueryBuilders.matchAllQuery()) + .get() + .getHits().getTotalHits().value; + + if (count == lastKnownCount.get()) { + // no progress - try to refresh for the next time + indexer.getClient().admin().indices().prepareRefresh().get(); + } + lastKnownCount.set(count); + } catch (Exception e) { // count now acts like search and barfs if all shards failed... + logger.debug("failed to executed count", e); + return false; + } + logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); + } else { + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); + } + return lastKnownCount.get() >= numDocs; + }; + + while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { + if (lastStartCount == lastKnownCount.get()) { + // we didn't make any progress + fail("failed to reach " + numDocs + "docs"); + } + lastStartCount = lastKnownCount.get(); + } + return lastKnownCount.get(); + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 28f845fe7d463..80bded6a5d1d3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -61,6 +60,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -118,93 +118,85 @@ public void testFollowIndex() throws Exception { } else { firstBatchNumDocs = randomIntBetween(10, 64); } - final int flushPoint = (int) (firstBatchNumDocs * 0.75); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk(); - for (int i = 0; i < flushPoint; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i)) - .source(source, XContentType.JSON) - .timeout(TimeValue.timeValueSeconds(1)); - bulkRequestBuilder.add(indexRequest); - } - bulkRequestBuilder.get(); - - leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); - - // Index some docs after the flush that might be recovered in the normal index following operations - for (int i = flushPoint; i < firstBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + waitForDocs(randomInt(firstBatchNumDocs), indexer); + leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); - boolean waitOnAll = randomBoolean(); + boolean waitOnAll = randomBoolean(); - final PutFollowAction.Request followRequest; - if (waitOnAll) { - followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); - } else { - followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); - } - PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - assertTrue(response.isFollowIndexCreated()); - assertTrue(response.isFollowIndexShardsAcked()); - assertTrue(response.isIndexFollowingStarted()); - - ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); - ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2"); - for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + final PutFollowAction.Request followRequest; if (waitOnAll) { - assertTrue(shardHealth.isPrimaryActive()); - assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); } else { - assertTrue(shardHealth.isPrimaryActive()); + followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); } - } - - final Map firstBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] firstBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : firstBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); + ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).get().getIndices().get("index2"); + for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + if (waitOnAll) { + assertTrue(shardHealth.isPrimaryActive()); + assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + } else { + assertTrue(shardHealth.isPrimaryActive()); + } } - } - assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } - for (int i = 0; i < firstBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); - } + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); - pauseFollow("index2"); - followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); - final int secondBatchNumDocs = randomIntBetween(2, 64); - logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } - final Map secondBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] secondBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : secondBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + pauseFollow("index2"); + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); + final int secondBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); + indexer.continueIndexing(secondBatchNumDocs); + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } } - } - assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + pauseFollow("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } - pauseFollow("index2"); - assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } public void testFollowIndexWithConcurrentMappingChanges() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 8d3c0c3b472aa..1326f0ebc79bb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -5,27 +5,46 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.store.IOContext; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.RestoreOnlyRepository; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class FollowEngineIndexShardTests extends IndexShardTestCase { @@ -76,4 +95,63 @@ public void testDoNotFillGaps() throws Exception { closeShards(indexShard); } + public void testRestoreShard() throws IOException { + final Settings sourceSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexShard source = newStartedShard(true, sourceSettings); + final Settings targetSettings = Settings.builder() + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + IndexShard target = newStartedShard(true, targetSettings, new FollowingEngineFactory()); + assertThat(IndexShardTestCase.getEngine(target), instanceOf(FollowingEngine.class)); + + indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(source)); + indexDoc(source, "_doc", "2"); + if (randomBoolean()) { + source.refresh("test"); + } + flushShard(source); // only flush source + ShardRouting routing = ShardRoutingHelper.initWithSameId(target.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE); + final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); + routing = ShardRoutingHelper.newWithRestoreSource(routing, + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); + target = reinitShard(target, routing); + Store sourceStore = source.store(); + Store targetStore = target.store(); + + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + try { + cleanLuceneIndex(targetStore.directory()); + for (String file : sourceStore.directory().listAll()) { + if (file.equals("write.lock") || file.startsWith("extra")) { + continue; + } + targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + })); + assertThat(target.getLocalCheckpoint(), equalTo(0L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + + assertDocs(target, "0", "2"); + + closeShard(source, false); + closeShards(target); + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 18e96619ec822..3e36fc5977491 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -128,7 +128,8 @@ protected void closeInternal() { snapshot.syncSnapshot(snapshotIndexCommit); // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo(); - tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc()); + final long maxDoc = segmentInfos.totalMaxDoc(); + tempStore.bootstrapNewHistory(maxDoc, maxDoc); store.incRef(); try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { IndexCommit indexCommit = reader.getIndexCommit();