Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
Expand Down Expand Up @@ -244,6 +245,12 @@ public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient(
RatisHelper.createRetryPolicy(conf), tlsConfig, conf);
}

public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClientNoRetry(
ConfigurationSource conf) {
return (leader, tlsConfig) -> newRaftClient(getRpcType(conf), leader,
RetryPolicies.noRetry(), tlsConfig, conf);
}

public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class ClosePipelineCommandHandler implements CommandHandler {
*/
public ClosePipelineCommandHandler(ConfigurationSource conf,
Executor executor) {
this(RatisHelper.newRaftClient(conf), executor);
this(RatisHelper.newRaftClientNoRetry(conf), executor);
}

/**
Expand Down Expand Up @@ -105,14 +106,16 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
try {
XceiverServerSpi server = ozoneContainer.getWriteChannel();
if (server.isExist(pipelineIdProto)) {
server.removeGroup(pipelineIdProto);
if (server instanceof XceiverServerRatis) {
// TODO: Refactor Ratis logic to XceiverServerRatis
// Propagate the group remove to the other Raft peers in the pipeline
XceiverServerRatis ratisServer = (XceiverServerRatis) server;
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory();
// This might throw GroupMismatchException if the Ratis group has been closed by other datanodes
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
// Try to send remove group for the other datanodes first, ignoring GroupMismatchException
// if the Ratis group has been closed in the other datanodes
peers.stream()
.filter(peer -> !peer.getId().equals(ratisServer.getServer().getId()))
.forEach(peer -> {
Expand All @@ -122,19 +125,34 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
} catch (GroupMismatchException ae) {
// ignore silently since this means that the group has been closed by earlier close pipeline
// command in another datanode
LOG.debug("Failed to remove group {} for pipeline {} on peer {} since the group has " +
"been removed by earlier close pipeline command handled in another datanode", raftGroupId,
pipelineID, peer.getId());
} catch (IOException ioe) {
LOG.warn("Failed to remove group {} for peer {}", raftGroupId, peer.getId(), ioe);
LOG.warn("Failed to remove group {} of pipeline {} on peer {}",
raftGroupId, pipelineID, peer.getId(), ioe);
}
});
}
// Remove the Ratis group from the current datanode pipeline, might throw GroupMismatchException as
// well. It is a no-op for XceiverServerSpi implementations (e.g. XceiverServerGrpc)
server.removeGroup(pipelineIdProto);
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn.getUuidString());
} else {
LOG.debug("Ignoring close pipeline command for pipeline {} " +
"as it does not exist", pipelineID);
LOG.debug("Ignoring close pipeline command for pipeline {} on datanode {} " +
"as it does not exist", pipelineID, dn.getUuidString());
}
} catch (IOException e) {
LOG.error("Can't close pipeline {}", pipelineID, e);
Throwable gme = HddsClientUtils.containsException(e, GroupMismatchException.class);
if (gme != null) {
// ignore silently since this means that the group has been closed by earlier close pipeline
// command in another datanode
LOG.debug("The group for pipeline {} on datanode {} has been removed by earlier close " +
"pipeline command handled in another datanode", pipelineID, dn.getUuidString());
} else {
LOG.error("Can't close pipeline {}", pipelineID, e);
}
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
Expand Down