-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Replica starts peer recovery with safe commit #28181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
2509d17
9d7db40
c9152ea
2ff5354
d66ba4b
1eea7b1
9b25413
efb5149
2642d7e
3ef8bf5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,7 +188,9 @@ public RecoveryResponse recoverToTarget() throws IOException { | |
| runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); | ||
|
|
||
| try { | ||
| prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can roll back all these naming changes if we we keep the old message (and the boolean) |
||
| // For a sequence based recovery, the target can keep its local translog | ||
| prepareTargetForTranslog(isSequenceNumberBasedRecoveryPossible == false, | ||
|
||
| translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); | ||
| } catch (final Exception e) { | ||
| throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); | ||
| } | ||
|
|
@@ -421,13 +423,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO | |
| } | ||
| } | ||
|
|
||
| void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { | ||
| void prepareTargetForTranslog(final boolean deleteLocalTranslog, final int totalTranslogOps) throws IOException { | ||
| StopWatch stopWatch = new StopWatch().start(); | ||
| logger.trace("recovery [phase1]: prepare remote engine for translog"); | ||
| final long startEngineStart = stopWatch.totalTime().millis(); | ||
| // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables | ||
| // garbage collection (not the JVM's GC!) of tombstone deletes. | ||
| cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); | ||
| cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps)); | ||
| stopWatch.stop(); | ||
|
|
||
| response.startTime = stopWatch.totalTime().millis() - startEngineStart; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -362,10 +362,13 @@ private void ensureRefCount() { | |
| /*** Implementation of {@link RecoveryTargetHandler } */ | ||
|
|
||
| @Override | ||
| public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { | ||
| public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { | ||
| state().getTranslog().totalOperations(totalTranslogOps); | ||
| // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the todo is still relevant no?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed it back as a note as it's a valid TODO. |
||
| indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
| if (deleteLocalTranslog) { | ||
| indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
| } else { | ||
| indexShard().openIndexAndSkipTranslogRecovery(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,7 +31,9 @@ | |
| import org.elasticsearch.cluster.routing.ShardRouting; | ||
| import org.elasticsearch.common.bytes.BytesArray; | ||
| import org.elasticsearch.common.lucene.uid.Versions; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.xcontent.XContentType; | ||
| import org.elasticsearch.index.IndexSettings; | ||
| import org.elasticsearch.index.VersionType; | ||
| import org.elasticsearch.index.engine.Engine; | ||
| import org.elasticsearch.index.engine.EngineConfig; | ||
|
|
@@ -226,7 +228,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { | |
| final IndexShard oldPrimary = shards.getPrimary(); | ||
| final IndexShard newPrimary = shards.getReplicas().get(0); | ||
| final IndexShard replica = shards.getReplicas().get(1); | ||
| boolean expectSeqNoRecovery = true; | ||
| if (randomBoolean()) { | ||
| // simulate docs that were inflight when primary failed, these will be rolled back | ||
| final int rollbackDocs = randomIntBetween(1, 5); | ||
|
|
@@ -239,7 +240,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { | |
| } | ||
| if (randomBoolean()) { | ||
| oldPrimary.flush(new FlushRequest(index.getName())); | ||
| expectSeqNoRecovery = false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -252,9 +252,29 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { | |
| equalTo(totalDocs - 1L)); | ||
|
|
||
| // index some more | ||
| totalDocs += shards.indexDocs(randomIntBetween(0, 5)); | ||
| int moreDocs = shards.indexDocs(randomIntBetween(0, 5)); | ||
| totalDocs += moreDocs; | ||
|
|
||
| // As a replica keeps a safe commit, the file-based recovery only happens if the required translog | ||
| // for the sequence based recovery are not fully retained and extra documents were added to the primary. | ||
| boolean expectSeqNoRecovery = (moreDocs == 0 || frequently()); | ||
|
||
| int uncommittedOpsOnPrimary = 0; | ||
| if (expectSeqNoRecovery == false) { | ||
| IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData()); | ||
| builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings()) | ||
| .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") | ||
| .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") | ||
| ); | ||
| newPrimary.indexSettings().updateIndexMetaData(builder.build()); | ||
| newPrimary.onSettingsChanged(); | ||
| shards.syncGlobalCheckpoint(); | ||
| newPrimary.flush(new FlushRequest()); | ||
| uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); | ||
| } | ||
|
|
||
| if (randomBoolean()) { | ||
| uncommittedOpsOnPrimary = 0; | ||
| shards.syncGlobalCheckpoint(); | ||
| newPrimary.flush(new FlushRequest()); | ||
| } | ||
|
|
||
|
|
@@ -269,7 +289,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { | |
| assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); | ||
| } else { | ||
| assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); | ||
| assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs)); | ||
| assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); | ||
| } | ||
|
|
||
| // roll back the extra ops in the replica | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I came up with the name and I'm sorry for changing my mind. I would propose
createNewTranslog. better no?