diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 9dc6af19353a..d4514b0b7e40 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -83,6 +83,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; @@ -1162,8 +1163,8 @@ public void evictStateMachineCache() { } @Override - public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) { - ratisServer.handleNodeSlowness(gid, roleInfoProto); + public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) { + ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7899cdcc0e67..a4c143439852 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -104,6 +104,7 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TraditionalBinaryPrefix; @@ -161,19 +162,18 @@ private static long nextCallId() { private int clientPort; private int dataStreamPort; private final RaftServer server; + private final String name; private final List chunkExecutors; private final ContainerDispatcher dispatcher; private final ContainerController containerController; private final ClientId clientId = ClientId.randomId(); private final StateContext context; - private final long nodeFailureTimeoutMs; private boolean isStarted = false; private final DatanodeDetails datanodeDetails; private final ConfigurationSource conf; // TODO: Remove the gids set when Ratis supports an api to query active // pipelines private final ConcurrentMap activePipelines = new ConcurrentHashMap<>(); - private final RaftPeerId raftPeerId; // Timeout used while calling submitRequest directly. private final long requestTimeout; private final boolean shouldDeleteRatisLogDirectory; @@ -197,14 +197,14 @@ private XceiverServerRatis(HddsDatanodeService hddsDatanodeService, DatanodeDeta this.context = context; this.dispatcher = dispatcher; this.containerController = containerController; - this.raftPeerId = RatisHelper.toRaftPeerId(dd); String threadNamePrefix = datanodeDetails.threadNamePrefix(); chunkExecutors = createChunkExecutors(conf, threadNamePrefix); - nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout(); shouldDeleteRatisLogDirectory = ratisServerConfig.shouldDeleteRatisLogDirectory(); RaftProperties serverProperties = newRaftProperties(); + final RaftPeerId raftPeerId = RatisHelper.toRaftPeerId(dd); + this.name = getClass().getSimpleName() + "(" + raftPeerId + ")"; this.server = RaftServer.newBuilder().setServerId(raftPeerId) .setProperties(serverProperties) @@ -474,7 +474,7 @@ private void setStateMachineDataConfigurations(RaftProperties properties) { // NOTE : the default value for the retry count in ratis is -1, // which means retry indefinitely. - int syncTimeoutRetryDefault = (int) nodeFailureTimeoutMs / + final int syncTimeoutRetryDefault = (int) ratisServerConfig.getFollowerSlownessTimeout() / dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS); int numSyncRetries = conf.getInt( OzoneConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES, @@ -558,7 +558,7 @@ private static Parameters createTlsParameters(SecurityConfig conf, @Override public void start() throws IOException { if (!isStarted) { - LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId()); + LOG.info("Starting {}", name); for (ThreadPoolExecutor executor : chunkExecutors) { executor.prestartAllCoreThreads(); } @@ -581,11 +581,11 @@ public void start() throws IOException { } } - private int getRealPort(InetSocketAddress address, Port.Name name) { + private int getRealPort(InetSocketAddress address, Port.Name portName) { int realPort = address.getPort(); - datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort)); - LOG.info("{} {} is started using port {} for {}", - getClass().getSimpleName(), server.getId(), realPort, name); + final Port port = DatanodeDetails.newPort(portName, realPort); + datanodeDetails.setPort(port); + LOG.info("{} is started using port {}", name, port); return realPort; } @@ -593,7 +593,7 @@ private int getRealPort(InetSocketAddress address, Port.Name name) { public void stop() { if (isStarted) { try { - LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId()); + LOG.info("Closing {}", name); // shutdown server before the executors as while shutting down, // some of the tasks would be executed using the executors. server.close(); @@ -602,7 +602,7 @@ public void stop() { } isStarted = false; } catch (IOException e) { - LOG.error("XceiverServerRatis Could not be stopped gracefully.", e); + LOG.error("Failed to close {}.", name, e); } } } @@ -706,45 +706,40 @@ private GroupInfoRequest createGroupInfoRequest( nextCallId()); } - private void handlePipelineFailure(RaftGroupId groupId, - RoleInfoProto roleInfoProto) { - String msg; - UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf()); - RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); + private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoProto, String reason) { + final RaftPeerId raftPeerId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); + Preconditions.assertEquals(getServer().getId(), raftPeerId, "raftPeerId"); + final StringBuilder b = new StringBuilder() + .append(name).append(" with datanodeId ").append(RatisHelper.toDatanodeId(raftPeerId)) + .append("handlePipelineFailure ").append(" for ").append(reason) + .append(": ").append(roleInfoProto.getRole()) + .append(" elapsed time=").append(roleInfoProto.getRoleElapsedTimeMs()).append("ms"); + switch (roleInfoProto.getRole()) { case CANDIDATE: - msg = datanode + " is in candidate state for " + - roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms"; + final long lastLeaderElapsedTime = roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs(); + b.append(", lastLeaderElapsedTime=").append(lastLeaderElapsedTime).append("ms"); break; case FOLLOWER: - msg = datanode + " closes pipeline when installSnapshot from leader " + - "because leader snapshot doesn't contain any data to replay, " + - "all the log entries prior to the snapshot might have been purged." + - "So follower should not try to install snapshot from leader but" + - "can close the pipeline here. It's in follower state for " + - roleInfoProto.getRoleElapsedTimeMs() + "ms"; + b.append(", outstandingOp=").append(roleInfoProto.getFollowerInfo().getOutstandingOp()); break; case LEADER: - StringBuilder sb = new StringBuilder(); - sb.append(datanode).append(" has not seen follower/s"); - for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo() - .getFollowerInfoList()) { - if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) { - sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId())) - .append(" for ").append(follower.getLastRpcElapsedTimeMs()) - .append("ms"); - } + final long followerSlownessTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout(); + for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo().getFollowerInfoList()) { + final long lastRpcElapsedTimeMs = follower.getLastRpcElapsedTimeMs(); + final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs; + final RaftPeerId followerId = RaftPeerId.valueOf(follower.getId().getId()); + b.append("\n Follower ").append(followerId) + .append(" with datanodeId ").append(RatisHelper.toDatanodeId(followerId)) + .append(" is ").append(slow ? "slow" : " responding") + .append(" with lastRpcElapsedTime=").append(lastRpcElapsedTimeMs).append("ms"); } - msg = sb.toString(); break; default: - LOG.error("unknown state: {}", roleInfoProto.getRole()); - throw new IllegalStateException("node" + id + " is in illegal role " - + roleInfoProto.getRole()); + throw new IllegalStateException("Unexpected role " + roleInfoProto.getRole()); } - triggerPipelineClose(groupId, msg, - ClosePipelineInfo.Reason.PIPELINE_FAILED); + triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED); } private void triggerPipelineClose(RaftGroupId groupId, String detail, @@ -869,12 +864,12 @@ public void removeGroup(HddsProtos.PipelineID pipelineId) processReply(reply); } - void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) { - handlePipelineFailure(groupId, roleInfoProto); + void handleFollowerSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto, RaftPeer follower) { + handlePipelineFailure(groupId, roleInfoProto, "slow follower " + follower.getId()); } void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) { - handlePipelineFailure(groupId, roleInfoProto); + handlePipelineFailure(groupId, roleInfoProto, "no leader"); } void handleApplyTransactionFailure(RaftGroupId groupId, @@ -901,10 +896,9 @@ void handleApplyTransactionFailure(RaftGroupId groupId, void handleInstallSnapshotFromLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { - LOG.warn("Install snapshot notification received from Leader with " + - "termIndex: {}, terminating pipeline: {}", + LOG.warn("handleInstallSnapshotFromLeader for firstTermIndexInLog={}, terminating pipeline: {}", firstTermIndexInLog, groupId); - handlePipelineFailure(groupId, roleInfoProto); + handlePipelineFailure(groupId, roleInfoProto, "install snapshot notification"); } /** @@ -950,7 +944,7 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, LOG.info("Leader change notification received for group: {} with new " + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId1); // Save the reported leader to be sent with the report to SCM - boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1); + final boolean leaderForGroup = server.getId().equals(raftPeerId1); activePipelines.compute(groupMemberId.getGroupId(), (key, value) -> value == null ? new ActivePipelineContext(leaderForGroup, false) : new ActivePipelineContext(leaderForGroup, value.isPendingClose()));