Skip to content

Commit 888d4bf

Browse files
[segment replication]Introducing common Replication interfaces for segment replication and recovery code paths (#3234) (#3984)
* RecoveryState inherits from ReplicationState + RecoveryTarget inherits from ReplicationTarget Signed-off-by: Poojita Raj <[email protected]> * Refactoring: mixedClusterVersion error fix + move Stage to ReplicationState Signed-off-by: Poojita Raj <[email protected]> * pull ReplicationListener into a top level class + add javadocs + address review comments Signed-off-by: Poojita Raj <[email protected]> * fix javadoc Signed-off-by: Poojita Raj <[email protected]> * review changes Signed-off-by: Poojita Raj <[email protected]> * Refactoring the hierarchy relationship between repl and recovery Signed-off-by: Poojita Raj <[email protected]> * style fix Signed-off-by: Poojita Raj <[email protected]> * move package common under replication Signed-off-by: Poojita Raj <[email protected]> * rename to replication Signed-off-by: Poojita Raj <[email protected]> * rename and doc changes Signed-off-by: Poojita Raj <[email protected]> (cherry picked from commit a023ad9) Co-authored-by: Poojita Raj <[email protected]>
1 parent fb4f96f commit 888d4bf

File tree

20 files changed

+750
-582
lines changed

20 files changed

+750
-582
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@
101101
import org.opensearch.indices.IndicesService;
102102
import org.opensearch.indices.NodeIndicesStats;
103103
import org.opensearch.indices.analysis.AnalysisModule;
104-
import org.opensearch.indices.recovery.RecoveryState.Stage;
105104
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
105+
import org.opensearch.indices.recovery.RecoveryState.Stage;
106106
import org.opensearch.node.NodeClosedException;
107107
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
108108
import org.opensearch.plugins.AnalysisPlugin;

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
import org.opensearch.indices.cluster.IndicesClusterStateService;
158158
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
159159
import org.opensearch.indices.recovery.RecoveryFailedException;
160+
import org.opensearch.indices.recovery.RecoveryListener;
160161
import org.opensearch.indices.recovery.RecoveryState;
161162
import org.opensearch.indices.recovery.RecoveryTarget;
162163
import org.opensearch.repositories.RepositoriesService;
@@ -2888,7 +2889,7 @@ protected Engine getEngineOrNull() {
28882889
public void startRecovery(
28892890
RecoveryState recoveryState,
28902891
PeerRecoveryTargetService recoveryTargetService,
2891-
PeerRecoveryTargetService.RecoveryListener recoveryListener,
2892+
RecoveryListener recoveryListener,
28922893
RepositoriesService repositoriesService,
28932894
Consumer<MappingMetadata> mappingUpdateConsumer,
28942895
IndicesService indicesService
@@ -2921,7 +2922,7 @@ public void startRecovery(
29212922
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
29222923
} catch (Exception e) {
29232924
failShard("corrupted preexisting index", e);
2924-
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
2925+
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
29252926
}
29262927
break;
29272928
case SNAPSHOT:
@@ -2996,15 +2997,15 @@ public void startRecovery(
29962997
private void executeRecovery(
29972998
String reason,
29982999
RecoveryState recoveryState,
2999-
PeerRecoveryTargetService.RecoveryListener recoveryListener,
3000+
RecoveryListener recoveryListener,
30003001
CheckedConsumer<ActionListener<Boolean>, Exception> action
30013002
) {
30023003
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
30033004
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
30043005
if (r) {
3005-
recoveryListener.onRecoveryDone(recoveryState);
3006+
recoveryListener.onDone(recoveryState);
30063007
}
3007-
}, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
3008+
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
30083009
}
30093010

30103011
/**

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
137137
import org.opensearch.indices.mapper.MapperRegistry;
138138
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
139+
import org.opensearch.indices.recovery.RecoveryListener;
139140
import org.opensearch.indices.recovery.RecoveryState;
140141
import org.opensearch.node.Node;
141142
import org.opensearch.plugins.IndexStorePlugin;
@@ -839,7 +840,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
839840
public IndexShard createShard(
840841
final ShardRouting shardRouting,
841842
final PeerRecoveryTargetService recoveryTargetService,
842-
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
843+
final RecoveryListener recoveryListener,
843844
final RepositoriesService repositoriesService,
844845
final Consumer<IndexShard.ShardFailure> onShardFailure,
845846
final Consumer<ShardId> globalCheckpointSyncer,

server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@
7878
import org.opensearch.indices.IndicesService;
7979
import org.opensearch.indices.recovery.PeerRecoverySourceService;
8080
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
81-
import org.opensearch.indices.recovery.RecoveryFailedException;
81+
import org.opensearch.indices.recovery.RecoveryListener;
8282
import org.opensearch.indices.recovery.RecoveryState;
83+
import org.opensearch.indices.replication.common.ReplicationState;
8384
import org.opensearch.repositories.RepositoriesService;
8485
import org.opensearch.search.SearchService;
8586
import org.opensearch.snapshots.SnapshotShardsService;
@@ -624,7 +625,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
624625
indicesService.createShard(
625626
shardRouting,
626627
recoveryTargetService,
627-
new RecoveryListener(shardRouting, primaryTerm),
628+
new RecoveryListener(shardRouting, primaryTerm, this),
628629
repositoriesService,
629630
failedShardHandler,
630631
globalCheckpointSyncer,
@@ -739,39 +740,16 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(
739740
return sourceNode;
740741
}
741742

742-
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
743-
744-
/**
745-
* ShardRouting with which the shard was created
746-
*/
747-
private final ShardRouting shardRouting;
748-
749-
/**
750-
* Primary term with which the shard was created
751-
*/
752-
private final long primaryTerm;
753-
754-
private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
755-
this.shardRouting = shardRouting;
756-
this.primaryTerm = primaryTerm;
757-
}
758-
759-
@Override
760-
public void onRecoveryDone(final RecoveryState state) {
761-
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
762-
}
763-
764-
@Override
765-
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
766-
handleRecoveryFailure(shardRouting, sendShardFailure, e);
767-
}
768-
}
769-
770743
// package-private for testing
771-
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
744+
public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
772745
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
773746
}
774747

748+
public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
749+
RecoveryState RecState = (RecoveryState) state;
750+
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
751+
}
752+
775753
private void failAndRemoveShard(
776754
ShardRouting shardRouting,
777755
boolean sendShardFailure,
@@ -1004,7 +982,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
1004982
T createShard(
1005983
ShardRouting shardRouting,
1006984
PeerRecoveryTargetService recoveryTargetService,
1007-
PeerRecoveryTargetService.RecoveryListener recoveryListener,
985+
RecoveryListener recoveryListener,
1008986
RepositoriesService repositoriesService,
1009987
Consumer<IndexShard.ShardFailure> onShardFailure,
1010988
Consumer<ShardId> globalCheckpointSyncer,

0 commit comments

Comments
 (0)