-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Tighten sequence numbers recovery #22212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
3164819
1c71393
3c37f4b
f57eb99
2a8d069
b9f68d4
3935af2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,7 @@ | |
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
@@ -86,6 +87,7 @@ | |
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.Function; | ||
| import java.util.function.LongSupplier; | ||
| import java.util.function.Supplier; | ||
|
|
||
| public class InternalEngine extends Engine { | ||
|
|
||
|
|
@@ -175,9 +177,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
| throw new IllegalArgumentException(openMode.toString()); | ||
| } | ||
| logger.trace("recovered [{}]", seqNoStats); | ||
| indexWriter = writer; | ||
| seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); | ||
| translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); | ||
| // norelease | ||
| /* | ||
| * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means | ||
| * that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local | ||
| * checkpoint to the maximum sequence number in the commit (at the potential expense of correctness). | ||
| */ | ||
| while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) { | ||
| seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1); | ||
| } | ||
| indexWriter = writer; | ||
| translog = openTranslog(engineConfig, writer, () -> seqNoService().getLocalCheckpoint()); | ||
| assert translog.getGeneration() != null; | ||
| } catch (IOException | TranslogCorruptedException e) { | ||
| throw new EngineCreationFailureException(shardId, "failed to create engine", e); | ||
|
|
@@ -412,7 +423,7 @@ private SearcherManager createSearcherManager() throws EngineException { | |
|
|
||
| @Override | ||
| public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException { | ||
| try (ReleasableLock lock = readLock.acquire()) { | ||
| try (ReleasableLock ignored = readLock.acquire()) { | ||
| ensureOpen(); | ||
| if (get.realtime()) { | ||
| VersionValue versionValue = versionMap.getUnderLock(get.uid()); | ||
|
|
@@ -434,11 +445,28 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws | |
| } | ||
| } | ||
|
|
||
| private boolean checkVersionConflict( | ||
| final Operation op, | ||
| final long currentVersion, | ||
| final long expectedVersion, | ||
| final boolean deleted) { | ||
| /** | ||
| * Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if | ||
| * no conflicts are found, the optional return value is not present. | ||
| * | ||
| * @param <T> the result type | ||
| * @param op the operation | ||
| * @param currentVersion the current version | ||
| * @param expectedVersion the expected version | ||
| * @param deleted {@code true} if the current version is not found or represents a delete | ||
| * @param onSuccess if there is a version conflict that can be ignored, the result of the operation | ||
| * @param onFailure if there is a version conflict that can not be ignored, the result of the operation | ||
| * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value | ||
| * is not present | ||
| */ | ||
| private <T extends Result> Optional<T> checkVersionConflict( | ||
| final Operation op, | ||
| final long currentVersion, | ||
| final long expectedVersion, | ||
| final boolean deleted, | ||
| final Supplier<T> onSuccess, | ||
| final Function<VersionConflictEngineException, T> onFailure) { | ||
| final T result; | ||
| if (op.versionType() == VersionType.FORCE) { | ||
| if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { | ||
| // If index was created in 5.0 or later, 'force' is not allowed at all | ||
|
|
@@ -452,14 +480,22 @@ private boolean checkVersionConflict( | |
| if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { | ||
| if (op.origin().isRecovery()) { | ||
| // version conflict, but okay | ||
| return true; | ||
| result = onSuccess.get(); | ||
| } else { | ||
| // fatal version conflict | ||
| throw new VersionConflictEngineException(shardId, op.type(), op.id(), | ||
| final VersionConflictEngineException e = | ||
| new VersionConflictEngineException( | ||
| shardId, | ||
| op.type(), | ||
| op.id(), | ||
| op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); | ||
| result = onFailure.apply(e); | ||
| } | ||
|
|
||
| return Optional.of(result); | ||
| } else { | ||
| return Optional.empty(); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private long checkDeletedAndGCed(VersionValue versionValue) { | ||
|
|
@@ -475,7 +511,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) { | |
| @Override | ||
| public IndexResult index(Index index) { | ||
| IndexResult result; | ||
| try (ReleasableLock lock = readLock.acquire()) { | ||
| try (ReleasableLock ignored = readLock.acquire()) { | ||
| ensureOpen(); | ||
| if (index.origin().isRecovery()) { | ||
| // Don't throttle recovery operations | ||
|
|
@@ -574,6 +610,7 @@ private IndexResult innerIndex(Index index) throws IOException { | |
| final Translog.Location location; | ||
| final long updatedVersion; | ||
| IndexResult indexResult = null; | ||
|
||
| long seqNo = index.seqNo(); | ||
| try (Releasable ignored = acquireLock(index.uid())) { | ||
| lastWriteNanos = index.startTime(); | ||
| /* if we have an autoGeneratedID that comes into the engine we can potentially optimize | ||
|
|
@@ -638,27 +675,31 @@ private IndexResult innerIndex(Index index) throws IOException { | |
| } | ||
| } | ||
| final long expectedVersion = index.version(); | ||
| if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { | ||
| // skip index operation because of version conflict on recovery | ||
| indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false); | ||
| final Optional<IndexResult> checkVersionConflictResult = | ||
| checkVersionConflict( | ||
| index, | ||
| currentVersion, | ||
| expectedVersion, | ||
| deleted, | ||
| () -> new IndexResult(currentVersion, index.seqNo(), false), | ||
| e -> new IndexResult(e, currentVersion, index.seqNo())); | ||
|
|
||
| if (checkVersionConflictResult.isPresent()) { | ||
| indexResult = checkVersionConflictResult.get(); | ||
| } else { | ||
| final long seqNo; | ||
| // no version conflict | ||
| if (index.origin() == Operation.Origin.PRIMARY) { | ||
| seqNo = seqNoService.generateSeqNo(); | ||
| } else { | ||
| seqNo = index.seqNo(); | ||
| seqNo = seqNoService().generateSeqNo(); | ||
| } | ||
| updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); | ||
| index.parsedDoc().version().setLongValue(updatedVersion); | ||
|
|
||
| // Update the document's sequence number and primary term, the | ||
| // sequence number here is derived here from either the sequence | ||
| // number service if this is on the primary, or the existing | ||
| // document's sequence number if this is on the replica. The | ||
| // primary term here has already been set, see | ||
| // IndexShard.prepareIndex where the Engine.Index operation is | ||
| // created | ||
| /** | ||
| * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence | ||
| * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The | ||
| * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. | ||
| */ | ||
| index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); | ||
| updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); | ||
| index.parsedDoc().version().setLongValue(updatedVersion); | ||
|
|
||
| if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { | ||
| // document does not exists, we can optimize for create, but double check if assertions are running | ||
|
|
@@ -669,17 +710,17 @@ private IndexResult innerIndex(Index index) throws IOException { | |
| } | ||
| indexResult = new IndexResult(updatedVersion, seqNo, deleted); | ||
| location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY | ||
| ? translog.add(new Translog.Index(index, indexResult)) | ||
| : null; | ||
| ? translog.add(new Translog.Index(index, indexResult)) | ||
| : null; | ||
| versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); | ||
| indexResult.setTranslogLocation(location); | ||
| } | ||
| indexResult.setTook(System.nanoTime() - index.startTime()); | ||
| indexResult.freeze(); | ||
| return indexResult; | ||
| } finally { | ||
| if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
| seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo()); | ||
| if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
| seqNoService().markSeqNoAsCompleted(seqNo); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -724,7 +765,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc | |
| @Override | ||
| public DeleteResult delete(Delete delete) { | ||
| DeleteResult result; | ||
| try (ReleasableLock lock = readLock.acquire()) { | ||
| try (ReleasableLock ignored = readLock.acquire()) { | ||
| ensureOpen(); | ||
| // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: | ||
| result = innerDelete(delete); | ||
|
|
@@ -749,6 +790,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { | |
| final long updatedVersion; | ||
| final boolean found; | ||
| DeleteResult deleteResult = null; | ||
| long seqNo = delete.seqNo(); | ||
| try (Releasable ignored = acquireLock(delete.uid())) { | ||
| lastWriteNanos = delete.startTime(); | ||
| final long currentVersion; | ||
|
|
@@ -764,32 +806,39 @@ private DeleteResult innerDelete(Delete delete) throws IOException { | |
| } | ||
|
|
||
| final long expectedVersion = delete.version(); | ||
| if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { | ||
| // skip executing delete because of version conflict on recovery | ||
| deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true); | ||
|
|
||
| final Optional<DeleteResult> result = | ||
| checkVersionConflict( | ||
| delete, | ||
| currentVersion, | ||
| expectedVersion, | ||
| deleted, | ||
| () -> new DeleteResult(expectedVersion, delete.seqNo(), true), | ||
| e -> new DeleteResult(e, expectedVersion, delete.seqNo())); | ||
|
|
||
| if (result.isPresent()) { | ||
| deleteResult = result.get(); | ||
| } else { | ||
| final long seqNo; | ||
| if (delete.origin() == Operation.Origin.PRIMARY) { | ||
| seqNo = seqNoService.generateSeqNo(); | ||
| } else { | ||
| seqNo = delete.seqNo(); | ||
| seqNo = seqNoService().generateSeqNo(); | ||
| } | ||
|
|
||
| updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); | ||
| found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); | ||
| deleteResult = new DeleteResult(updatedVersion, seqNo, found); | ||
| location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY | ||
| ? translog.add(new Translog.Delete(delete, deleteResult)) | ||
| : null; | ||
| ? translog.add(new Translog.Delete(delete, deleteResult)) | ||
| : null; | ||
| versionMap.putUnderLock(delete.uid().bytes(), | ||
| new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); | ||
| new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); | ||
| deleteResult.setTranslogLocation(location); | ||
| } | ||
| deleteResult.setTook(System.nanoTime() - delete.startTime()); | ||
| deleteResult.freeze(); | ||
| return deleteResult; | ||
| } finally { | ||
| if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
| seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); | ||
| if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
| seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assert at the end of this method that the seqNo is set on the replica request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.