diff --git a/CHANGELOG.md b/CHANGELOG.md index fb1bfc91c5361..951d5d200f90c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) +- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) ### Deprecated @@ -89,4 +90,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD -[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x \ No newline at end of file +[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 08159c44eb312..9185ef0d440ce 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -203,6 +203,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** @@ -1704,12 +1705,7 @@ public void prepareForIndexRecovery() { * This is the first operation after the local checkpoint of the safe commit if exists. */ private long recoverLocallyUpToGlobalCheckpoint() { - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + validateLocalRecoveryState(); final Optional safeCommit; final long globalCheckpoint; try { @@ -1806,21 +1802,17 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { * @return the starting sequence number from which the recovery should start. */ private long recoverLocallyUptoLastCommit() { + assert isRemoteTranslogEnabled() : "Remote translog store is not enabled"; long seqNo; - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + validateLocalRecoveryState(); try { - seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO)); } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.trace("skip local recovery as no index commit found"); + logger.error("skip local recovery as no index commit found", e); return UNASSIGNED_SEQ_NO; } catch (Exception e) { - logger.debug("skip local recovery as failed to find the safe commit", e); + logger.error("skip local recovery as failed to find the safe commit", e); return UNASSIGNED_SEQ_NO; } @@ -1829,12 +1821,21 @@ private long recoverLocallyUptoLastCommit() { recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.getTranslog().totalLocal(0); } catch (Exception e) { - logger.debug("check index failed during fetch seqNo", e); + logger.error("check index failed during fetch seqNo", e); return UNASSIGNED_SEQ_NO; } return seqNo; } + private void validateLocalRecoveryState() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -2041,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; - assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + userData.get(Engine.HISTORY_UUID_KEY) diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 324d246adc88b..b5702431ed4bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request)); } + /** + * Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see + * github issue on it. + * @param recoveryId recovery id + * @param preExistingRequest start recovery request + */ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) { final String actionName; final TransportRequest requestToSend; @@ -239,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -247,7 +253,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - remoteTranslogEnabled == false + !remoteTranslogEnabled ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 054aa6ad6eacc..1fcdfd79c544e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -854,7 +854,7 @@ protected final void recoverUnstartedReplica( final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); IndexShard indexShard = recoveryTarget.indexShard(); boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode,