diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 6982d41fbce5..d0fd0db12950 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.OptionalLong; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -34,7 +34,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -55,6 +55,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.RaftException; import org.apache.ratis.protocol.RaftClientReply; @@ -103,7 +104,8 @@ public static XceiverClientRatis newXceiverClientRatis( // Map to track commit index at every server private final ConcurrentHashMap commitInfoMap; - private XceiverClientMetrics metrics; + private final XceiverClientMetrics metrics + = XceiverClientManager.getXceiverClientMetrics(); /** * Constructs a client. @@ -117,31 +119,46 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, this.retryPolicy = retryPolicy; commitInfoMap = new ConcurrentHashMap<>(); this.tlsConfig = tlsConfig; - metrics = XceiverClientManager.getXceiverClientMetrics(); this.ozoneConfiguration = configuration; + + if (LOG.isTraceEnabled()) { + LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(), + new Throwable("TRACE")); + } + } + + private long updateCommitInfosMap(RaftClientReply reply) { + return Optional.ofNullable(reply) + .filter(RaftClientReply::isSuccess) + .map(RaftClientReply::getCommitInfos) + .map(this::updateCommitInfosMap) + .orElse(0L); } - private void updateCommitInfosMap( + private long updateCommitInfosMap( Collection commitInfoProtos) { // if the commitInfo map is empty, just update the commit indexes for each // of the servers + final Stream stream; if (commitInfoMap.isEmpty()) { - commitInfoProtos.forEach(proto -> commitInfoMap - .put(RatisHelper.toDatanodeId(proto.getServer()), - proto.getCommitIndex())); + stream = commitInfoProtos.stream().map(this::putCommitInfo); // In case the commit is happening 2 way, just update the commitIndex // for the servers which have been successfully updating the commit // indexes. This is important because getReplicatedMinCommitIndex() // should always return the min commit index out of the nodes which have // been replicating data successfully. } else { - commitInfoProtos.forEach(proto -> commitInfoMap + stream = commitInfoProtos.stream().map(proto -> commitInfoMap .computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()), - (address, index) -> { - index = proto.getCommitIndex(); - return index; - })); + (address, index) -> proto.getCommitIndex())); } + return stream.mapToLong(Long::longValue).min().orElse(0); + } + + private long putCommitInfo(RaftProtos.CommitInfoProto proto) { + final long index = proto.getCommitIndex(); + commitInfoMap.put(RatisHelper.toDatanodeId(proto.getServer()), index); + return index; } /** @@ -233,9 +250,8 @@ private CompletableFuture sendRequestAsync( // gets the minimum log index replicated to all servers @Override public long getReplicatedMinCommitIndex() { - OptionalLong minIndex = - commitInfoMap.values().parallelStream().mapToLong(v -> v).min(); - return minIndex.isPresent() ? minIndex.getAsLong() : 0; + return commitInfoMap.values().parallelStream() + .mapToLong(Long::longValue).min().orElse(0); } private void addDatanodetoReply(UUID address, XceiverClientReply reply) { @@ -244,51 +260,58 @@ private void addDatanodetoReply(UUID address, XceiverClientReply reply) { reply.addDatanode(builder.build()); } + private XceiverClientReply newWatchReply( + long watchIndex, Object reason, long replyIndex) { + LOG.debug("watchForCommit({}) returns {} {}", + watchIndex, reason, replyIndex); + final XceiverClientReply reply = new XceiverClientReply(null); + reply.setLogIndex(replyIndex); + return reply; + } + @Override public XceiverClientReply watchForCommit(long index) throws InterruptedException, ExecutionException, TimeoutException, IOException { - long commitIndex = getReplicatedMinCommitIndex(); - XceiverClientReply clientReply = new XceiverClientReply(null); - if (commitIndex >= index) { - // return the min commit index till which the log has been replicated to - // all servers - clientReply.setLogIndex(commitIndex); - return clientReply; + final long replicatedMin = getReplicatedMinCommitIndex(); + if (replicatedMin >= index) { + return newWatchReply(index, "replicatedMin", replicatedMin); } - RaftClientReply reply; + try { CompletableFuture replyFuture = getClient().async() .watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); - replyFuture.get(); + final RaftClientReply reply = replyFuture.get(); + final long updated = updateCommitInfosMap(reply); + Preconditions.checkState(updated >= index); + return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated); } catch (Exception e) { Throwable t = HddsClientUtils.checkForException(e); LOG.warn("3 way commit failed on pipeline {}", pipeline, e); if (t instanceof GroupMismatchException) { throw e; } - reply = getClient().async() + final RaftClientReply reply = getClient().async() .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(); - List commitInfoProtoList = - reply.getCommitInfos().stream() - .filter(i -> i.getCommitIndex() < index) - .collect(Collectors.toList()); - commitInfoProtoList.parallelStream().forEach(proto -> { - UUID address = RatisHelper.toDatanodeId(proto.getServer()); - addDatanodetoReply(address, clientReply); - // since 3 way commit has failed, the updated map from now on will - // only store entries for those datanodes which have had successful - // replication. - commitInfoMap.remove(address); - LOG.info( - "Could not commit index {} on pipeline {} to all the nodes. " + - "Server {} has failed. Committed by majority.", - index, pipeline, address); - }); + final XceiverClientReply clientReply = newWatchReply( + index, ReplicationLevel.MAJORITY_COMMITTED, index); + reply.getCommitInfos().stream() + .filter(i -> i.getCommitIndex() < index) + .forEach(proto -> { + UUID address = RatisHelper.toDatanodeId(proto.getServer()); + addDatanodetoReply(address, clientReply); + // since 3 way commit has failed, the updated map from now on will + // only store entries for those datanodes which have had successful + // replication. + commitInfoMap.remove(address); + LOG.info( + "Could not commit index {} on pipeline {} to all the nodes. " + + "Server {} has failed. Committed by majority.", + index, pipeline, address); + }); + return clientReply; } - clientReply.setLogIndex(index); - return clientReply; } /**