diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index b1fe096a564c3..cef89e1ce7855 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -150,6 +151,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh final long version = indexResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); + indexRequest.seqNo(indexResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); @@ -173,6 +175,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(deleteResult.getVersion()); + deleteRequest.seqNo(deleteResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound()); @@ -182,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh break; default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); } + // update the bulk item request because update request execution can mutate the bulk item request request.items()[requestIndex] = replicaRequest; if (operationResult == null) { // in case of noop update operation @@ -282,6 +286,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final long version = updateOperationResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); + indexRequest.seqNo(updateOperationResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); } break; @@ -292,6 +297,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(updateOperationResult.getVersion()); + deleteRequest.seqNo(updateOperationResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); } break; @@ -342,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest); break; } + assert (replicaRequest.request() instanceof IndexRequest + && ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || + (replicaRequest.request() instanceof DeleteRequest + && ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); // successful operation break; // out of retry loop } else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) { @@ -364,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I switch (docWriteRequest.opType()) { case CREATE: case INDEX: - operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica); + operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica); break; case DELETE: - operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica); + operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica); break; default: throw new IllegalStateException("Unexpected request operation type on replica: " diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 81f341090a59d..0d0d76c76919c 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -129,6 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde // update the request with the version so it will go to the replicas request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.version(result.getVersion()); + request.seqNo(result.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); response = new DeleteResponse( primary.shardId(), diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 98e8484b18331..905b08f2bd3f8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -78,6 +78,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -86,6 +87,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; public class InternalEngine extends Engine { @@ -175,9 +177,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { throw new IllegalArgumentException(openMode.toString()); } logger.trace("recovered [{}]", seqNoStats); - indexWriter = writer; seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); - translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); + // norelease + /* + * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means + * that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local + * checkpoint to the maximum sequence number in the commit (at the potential expense of correctness). + */ + while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) { + seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1); + } + indexWriter = writer; + translog = openTranslog(engineConfig, writer, () -> seqNoService().getLocalCheckpoint()); assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -412,7 +423,7 @@ private SearcherManager createSearcherManager() throws EngineException { @Override public GetResult get(Get get, Function searcherFactory) throws EngineException { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (get.realtime()) { VersionValue versionValue = versionMap.getUnderLock(get.uid()); @@ -434,11 +445,28 @@ public GetResult get(Get get, Function searcherFactory) throws } } - private boolean checkVersionConflict( - final Operation op, - final long currentVersion, - final long expectedVersion, - final boolean deleted) { + /** + * Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if + * no conflicts are found, the optional return value is not present. + * + * @param the result type + * @param op the operation + * @param currentVersion the current version + * @param expectedVersion the expected version + * @param deleted {@code true} if the current version is not found or represents a delete + * @param onSuccess if there is a version conflict that can be ignored, the result of the operation + * @param onFailure if there is a version conflict that can not be ignored, the result of the operation + * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value + * is not present + */ + private Optional checkVersionConflict( + final Operation op, + final long currentVersion, + final long expectedVersion, + final boolean deleted, + final Supplier onSuccess, + final Function onFailure) { + final T result; if (op.versionType() == VersionType.FORCE) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { // If index was created in 5.0 or later, 'force' is not allowed at all @@ -452,14 +480,22 @@ private boolean checkVersionConflict( if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { if (op.origin().isRecovery()) { // version conflict, but okay - return true; + result = onSuccess.get(); } else { // fatal version conflict - throw new VersionConflictEngineException(shardId, op.type(), op.id(), + final VersionConflictEngineException e = + new VersionConflictEngineException( + shardId, + op.type(), + op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + result = onFailure.apply(e); } + + return Optional.of(result); + } else { + return Optional.empty(); } - return false; } private long checkDeletedAndGCed(VersionValue versionValue) { @@ -475,7 +511,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) { @Override public IndexResult index(Index index) { IndexResult result; - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (index.origin().isRecovery()) { // Don't throttle recovery operations @@ -573,7 +609,7 @@ private IndexResult innerIndex(Index index) throws IOException { assert assertSequenceNumber(index.origin(), index.seqNo()); final Translog.Location location; final long updatedVersion; - IndexResult indexResult = null; + long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); /* if we have an autoGeneratedID that comes into the engine we can potentially optimize @@ -638,27 +674,32 @@ private IndexResult innerIndex(Index index) throws IOException { } } final long expectedVersion = index.version(); - if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { - // skip index operation because of version conflict on recovery - indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false); + final Optional checkVersionConflictResult = + checkVersionConflict( + index, + currentVersion, + expectedVersion, + deleted, + () -> new IndexResult(currentVersion, index.seqNo(), false), + e -> new IndexResult(e, currentVersion, index.seqNo())); + + final IndexResult indexResult; + if (checkVersionConflictResult.isPresent()) { + indexResult = checkVersionConflictResult.get(); } else { - final long seqNo; + // no version conflict if (index.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService.generateSeqNo(); - } else { - seqNo = index.seqNo(); + seqNo = seqNoService().generateSeqNo(); } - updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - index.parsedDoc().version().setLongValue(updatedVersion); - // Update the document's sequence number and primary term, the - // sequence number here is derived here from either the sequence - // number service if this is on the primary, or the existing - // document's sequence number if this is on the replica. The - // primary term here has already been set, see - // IndexShard.prepareIndex where the Engine.Index operation is - // created + /** + * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence + * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The + * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. + */ index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); + updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + index.parsedDoc().version().setLongValue(updatedVersion); if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { // document does not exists, we can optimize for create, but double check if assertions are running @@ -669,8 +710,8 @@ private IndexResult innerIndex(Index index) throws IOException { } indexResult = new IndexResult(updatedVersion, seqNo, deleted); location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - ? translog.add(new Translog.Index(index, indexResult)) - : null; + ? translog.add(new Translog.Index(index, indexResult)) + : null; versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); indexResult.setTranslogLocation(location); } @@ -678,8 +719,8 @@ private IndexResult innerIndex(Index index) throws IOException { indexResult.freeze(); return indexResult; } finally { - if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo()); + if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService().markSeqNoAsCompleted(seqNo); } } @@ -724,7 +765,7 @@ private static void update(final Term uid, final List doc @Override public DeleteResult delete(Delete delete) { DeleteResult result; - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: result = innerDelete(delete); @@ -748,7 +789,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { final Translog.Location location; final long updatedVersion; final boolean found; - DeleteResult deleteResult = null; + long seqNo = delete.seqNo(); try (Releasable ignored = acquireLock(delete.uid())) { lastWriteNanos = delete.startTime(); final long currentVersion; @@ -764,32 +805,40 @@ private DeleteResult innerDelete(Delete delete) throws IOException { } final long expectedVersion = delete.version(); - if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { - // skip executing delete because of version conflict on recovery - deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true); + + final Optional result = + checkVersionConflict( + delete, + currentVersion, + expectedVersion, + deleted, + () -> new DeleteResult(expectedVersion, delete.seqNo(), true), + e -> new DeleteResult(e, expectedVersion, delete.seqNo())); + + final DeleteResult deleteResult; + if (result.isPresent()) { + deleteResult = result.get(); } else { - final long seqNo; if (delete.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService.generateSeqNo(); - } else { - seqNo = delete.seqNo(); + seqNo = seqNoService().generateSeqNo(); } + updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); deleteResult = new DeleteResult(updatedVersion, seqNo, found); location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - ? translog.add(new Translog.Delete(delete, deleteResult)) - : null; + ? translog.add(new Translog.Delete(delete, deleteResult)) + : null; versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); deleteResult.setTranslogLocation(location); } deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); return deleteResult; } finally { - if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); + if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService().markSeqNoAsCompleted(seqNo); } } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 18d15707e40e0..e8aa0cdeb89db 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -149,12 +149,14 @@ public synchronized long getCheckpoint() { * updates the global checkpoint on a replica shard (after it has been updated by the primary). */ synchronized void updateCheckpointOnReplica(long globalCheckpoint) { + /* + * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary + * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other + * replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary. + */ if (this.globalCheckpoint <= globalCheckpoint) { this.globalCheckpoint = globalCheckpoint; logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); - } else { - throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" + - this.globalCheckpoint + "], got [" + globalCheckpoint + "]"); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 84f35ebca43d9..5f59af04b509f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -312,9 +312,9 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler { - recoveryTarget.finalizeRecovery(); shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId()); + recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); if (request.isPrimaryRelocation()) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 1ae1d494ca672..67ee1a5ac9ae3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -333,7 +333,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut } @Override - public void finalizeRecovery() { + public void finalizeRecovery(final long globalCheckpoint) { + indexShard().updateGlobalCheckpointOnReplica(globalCheckpoint); final IndexShard indexShard = indexShard(); indexShard.finalizeRecovery(); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 86afa498e625e..5cbfb4a53a5bf 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -39,11 +39,12 @@ public interface RecoveryTargetHandler { void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException; /** - * The finalize request clears unreferenced translog files, refreshes the engine now that - * new segments are available, and enables garbage collection of - * tombstone files. - **/ - void finalizeRecovery(); + * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and + * updates the global checkpoint. + * + * @param globalCheckpoint the global checkpoint on the recovery source + */ + void finalizeRecovery(long globalCheckpoint); /** * Blockingly waits for cluster state with at least clusterStateVersion to be available @@ -82,4 +83,5 @@ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReferenc * @return the allocation id of the target shard. */ String getTargetAllocationId(); + } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ef6b7c2f9011f..5fa1ca22c7065 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAut } @Override - public void finalizeRecovery() { + public void finalizeRecovery(final long globalCheckpoint) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, - new RecoveryFinalizeRecoveryRequest(recoveryId, shardId), + new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 340ea745aae1b..22746aaf2a19f 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.Tokenizer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; @@ -74,6 +73,7 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -130,7 +130,6 @@ import org.junit.After; import org.junit.Before; -import java.io.IOError; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -153,11 +152,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -319,12 +322,27 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store, } protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier indexWriterSupplier) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterSupplier, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + Supplier indexWriterSupplier, + Supplier sequenceNumbersServiceSupplier) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); InternalEngine internalEngine = new InternalEngine(config) { @Override IndexWriter createWriter(boolean create) throws IOException { return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create); } + + @Override + public SequenceNumbersService seqNoService() { + return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService(); + } }; if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); @@ -2914,6 +2932,193 @@ public void testSequenceIDs() throws Exception { searchResult.close(); } + public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws BrokenBarrierException, InterruptedException, IOException { + engine.close(); + final int docs = randomIntBetween(1, 32); + InternalEngine initialEngine = null; + try { + final CountDownLatch latch = new CountDownLatch(1); + final CyclicBarrier barrier = new CyclicBarrier(2); + final AtomicBoolean skip = new AtomicBoolean(); + final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final List threads = new ArrayList<>(); + final SequenceNumbersService seqNoService = + new SequenceNumbersService( + shardId, + defaultSettings, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO) { + @Override + public long generateSeqNo() { + final long seqNo = super.generateSeqNo(); + if (skip.get()) { + try { + barrier.await(); + latch.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (expectedLocalCheckpoint.get() + 1 == seqNo) { + expectedLocalCheckpoint.set(seqNo); + } + } + return seqNo; + } + }; + initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); + final InternalEngine finalInitialEngine = initialEngine; + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final Term uid = newUid(id); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, testDocumentWithTextField(), SOURCE, null); + + skip.set(randomBoolean()); + final Thread thread = new Thread(() -> finalInitialEngine.index(new Engine.Index(uid, doc))); + thread.start(); + if (skip.get()) { + threads.add(thread); + barrier.await(); + } else { + thread.join(); + } + } + + assertThat(initialEngine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint.get())); + assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (docs - 1))); + initialEngine.flush(true, true); + + latch.countDown(); + for (final Thread thread : threads) { + thread.join(); + } + } finally { + IOUtils.close(initialEngine); + } + + try (final Engine recoveringEngine = + new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); + } + } + + public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException { + final long v = Versions.MATCH_ANY; + final VersionType t = VersionType.INTERNAL; + final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + final int docs = randomIntBetween(1, 32); + InternalEngine initialEngine = null; + try { + initialEngine = engine; + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final Term uid = newUid(id); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, testDocumentWithTextField(), SOURCE, null); + // create a gap at sequence number 3 * i + 1 + initialEngine.index(new Engine.Index(uid, doc, 3 * i, 1, v, t, REPLICA, System.nanoTime(), ts, false)); + initialEngine.delete(new Engine.Delete("type", id, uid, 3 * i + 2, 1, v, t, REPLICA, System.nanoTime())); + } + + // bake the commit with the local checkpoint stuck at 0 and gaps all along the way up to the max sequence number + assertThat(initialEngine.seqNoService().getLocalCheckpoint(), equalTo((long) 0)); + assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (3 * (docs - 1) + 2))); + initialEngine.flush(true, true); + + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final Term uid = newUid(id); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, testDocumentWithTextField(), SOURCE, null); + initialEngine.index(new Engine.Index(uid, doc, 3 * i + 1, 1, v, t, REPLICA, System.nanoTime(), ts, false)); + } + } finally { + IOUtils.close(initialEngine); + } + + try (final Engine recoveringEngine = + new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1))); + } + } + + public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOException { + final List operations = new ArrayList<>(); + + final int numberOfOperations = randomIntBetween(16, 32); + final Term uid = newUid("1"); + final Document document = testDocumentWithTextField(); + final AtomicLong sequenceNumber = new AtomicLong(); + final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA); + final LongSupplier sequenceNumberSupplier = + origin == PRIMARY ? () -> SequenceNumbersService.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement; + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, document, B_1, null); + for (int i = 0; i < numberOfOperations; i++) { + if (randomBoolean()) { + final Engine.Index index = new Engine.Index( + uid, + doc, + sequenceNumberSupplier.getAsLong(), + 1, + i, + VersionType.EXTERNAL, + origin, + System.nanoTime(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + false); + operations.add(index); + } else { + final Engine.Delete delete = new Engine.Delete( + "test", + "1", + uid, + sequenceNumberSupplier.getAsLong(), + 1, + i, + VersionType.EXTERNAL, + origin, + System.nanoTime()); + operations.add(delete); + } + } + + final boolean exists = operations.get(operations.size() - 1) instanceof Engine.Index; + Randomness.shuffle(operations); + + for (final Engine.Operation operation : operations) { + if (operation instanceof Engine.Index) { + engine.index((Engine.Index) operation); + } else { + engine.delete((Engine.Delete) operation); + } + } + + final long expectedLocalCheckpoint; + if (origin == PRIMARY) { + // we can only advance as far as the number of operations that did not conflict + int count = 0; + + // each time the version increments as we walk the list, that counts as a successful operation + long version = -1; + for (int i = 0; i < numberOfOperations; i++) { + if (operations.get(i).version() >= version) { + count++; + version = operations.get(i).version(); + } + } + + // sequence numbers start at zero, so the expected local checkpoint is the number of successful operations minus one + expectedLocalCheckpoint = count - 1; + } else { + expectedLocalCheckpoint = numberOfOperations - 1; + } + + assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); + try (final Engine.GetResult result = engine.get(new Engine.Get(true, uid))) { + assertThat(result.exists(), equalTo(exists)); + } + } + /** * Return a tuple representing the sequence ID for the given {@code Get} * operation. The first value in the tuple is the sequence number, the diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index bafe6350a51cd..93b20633cf152 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -109,13 +109,14 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } @Override - public void finalizeRecovery() { + public void finalizeRecovery(long globalCheckpoint) { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); } blockIfNeeded(RecoveryState.Stage.FINALIZE); - super.finalizeRecovery(); + super.finalizeRecovery(globalCheckpoint); } + } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index f4de4bdea0bae..bbd1ad56bd0b3 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -23,14 +23,12 @@ import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -64,7 +62,6 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -98,8 +95,6 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) @TestLogging("_root:DEBUG,org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.index.shard.service:TRACE") -@LuceneTestCase.AwaitsFix(bugUrl = "primary relocation needs to transfer the global check point. otherwise the new primary sends a " + - "an unknown global checkpoint during sync, causing assertions to trigger") public class RelocationIT extends ESIntegTestCase { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); @@ -183,15 +178,13 @@ public void testSimpleRelocationNoIndexing() { clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count again..."); client().admin().indices().prepareRefresh().execute().actionGet(); assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().totalHits(), equalTo(20L)); } - @TestLogging("action.index:TRACE,action.bulk:TRACE,action.search:TRACE") + @TestLogging("org.elasticsearch.action.index:TRACE,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.search:TRACE") public void testRelocationWhileIndexingRandom() throws Exception { int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); int numberOfReplicas = randomBoolean() ? 0 : 1; @@ -210,12 +203,12 @@ public void testRelocationWhileIndexingRandom() throws Exception { ).get(); - for (int i = 1; i < numberOfNodes; i++) { - logger.info("--> starting [node{}] ...", i + 1); - nodes[i] = internalCluster().startNode(); - if (i != numberOfNodes - 1) { + for (int i = 2; i <= numberOfNodes; i++) { + logger.info("--> starting [node{}] ...", i); + nodes[i - 1] = internalCluster().startNode(); + if (i != numberOfNodes) { ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) - .setWaitForNodes(Integer.toString(i + 1)).setWaitForGreenStatus().execute().actionGet(); + .setWaitForNodes(Integer.toString(i)).setWaitForGreenStatus().execute().actionGet(); assertThat(healthResponse.isTimedOut(), equalTo(false)); } } @@ -246,8 +239,6 @@ public void testRelocationWhileIndexingRandom() throws Exception { } ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } @@ -261,7 +252,6 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> searching the index"); boolean ranOnce = false; for (int i = 0; i < 10; i++) { - try { logger.info("--> START search test round {}", i + 1); SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexer.totalIndexedDocs()).storedFields().execute().actionGet().getHits(); ranOnce = true; @@ -283,10 +273,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { } assertThat(hits.totalHits(), equalTo(indexer.totalIndexedDocs())); logger.info("--> DONE search test round {}", i + 1); - } catch (SearchPhaseExecutionException ex) { - // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? - logger.warn("Got exception while searching.", ex); - } + } if (!ranOnce) { fail(); @@ -294,6 +281,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { } } + @TestLogging("org.elasticsearch.action.index:TRACE,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.search:TRACE") public void testRelocationWhileRefreshing() throws Exception { int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); int numberOfReplicas = randomBoolean() ? 0 : 1; @@ -464,6 +452,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO } } + @TestLogging("org.elasticsearch.action.index:TRACE,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.search:TRACE") public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException { int halfNodes = randomIntBetween(1, 3); Settings[] nodeSettings = Stream.concat( diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 7ff288f3a6baa..cb31ac6028bbf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1323,7 +1323,7 @@ public void indexRandom(boolean forceRefresh, List builders /** * Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either - * indexes they in a blocking or async fashion. This is very useful to catch problems that relate to internal document + * indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document * ids or index segment creations. Some features might have bug when a given document is the first or the last in a * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index * layout. @@ -1339,7 +1339,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List