Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -2876,7 +2877,7 @@ protected Engine getEngineOrNull() {
public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
Expand Down Expand Up @@ -2909,7 +2910,7 @@ public void startRecovery(
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
break;
case SNAPSHOT:
Expand Down Expand Up @@ -2984,15 +2985,15 @@ public void startRecovery(
private void executeRecovery(
String reason,
RecoveryState recoveryState,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action
) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
recoveryListener.onDone(recoveryState);
}
}, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -839,7 +840,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
public IndexShard createShard(
final ShardRouting shardRouting,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.PeerRecoverySourceService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -624,7 +625,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
indicesService.createShard(
shardRouting,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
new RecoveryListener(shardRouting, primaryTerm, this),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand Down Expand Up @@ -739,39 +740,16 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(
return sourceNode;
}

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;

/**
* Primary term with which the shard was created
*/
private final long primaryTerm;

private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}

// package-private for testing
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}

public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState RecState = (RecoveryState) state;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can change the method parameter type to RecoveryState, and this casting can be removed

shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

private void failAndRemoveShard(
ShardRouting shardRouting,
boolean sendShardFailure,
Expand Down Expand Up @@ -1004,7 +982,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
T createShard(
ShardRouting shardRouting,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,
Expand Down
Loading