Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -34,7 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand All @@ -55,6 +55,7 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.RaftClientReply;
Expand Down Expand Up @@ -103,7 +104,8 @@ public static XceiverClientRatis newXceiverClientRatis(
// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;

private XceiverClientMetrics metrics;
private final XceiverClientMetrics metrics
= XceiverClientManager.getXceiverClientMetrics();

/**
* Constructs a client.
Expand All @@ -117,31 +119,46 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
this.tlsConfig = tlsConfig;
metrics = XceiverClientManager.getXceiverClientMetrics();
this.ozoneConfiguration = configuration;

if (LOG.isTraceEnabled()) {
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
}
}

private long updateCommitInfosMap(RaftClientReply reply) {
return Optional.ofNullable(reply)
.filter(RaftClientReply::isSuccess)
.map(RaftClientReply::getCommitInfos)
.map(this::updateCommitInfosMap)
.orElse(0L);
}

private void updateCommitInfosMap(
private long updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
final Stream<Long> stream;
if (commitInfoMap.isEmpty()) {
commitInfoProtos.forEach(proto -> commitInfoMap
.put(RatisHelper.toDatanodeId(proto.getServer()),
proto.getCommitIndex()));
stream = commitInfoProtos.stream().map(this::putCommitInfo);
// In case the commit is happening 2 way, just update the commitIndex
// for the servers which have been successfully updating the commit
// indexes. This is important because getReplicatedMinCommitIndex()
// should always return the min commit index out of the nodes which have
// been replicating data successfully.
} else {
commitInfoProtos.forEach(proto -> commitInfoMap
stream = commitInfoProtos.stream().map(proto -> commitInfoMap
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
(address, index) -> {
index = proto.getCommitIndex();
return index;
}));
(address, index) -> proto.getCommitIndex()));
}
return stream.mapToLong(Long::longValue).min().orElse(0);
}

private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
final long index = proto.getCommitIndex();
commitInfoMap.put(RatisHelper.toDatanodeId(proto.getServer()), index);
return index;
}

/**
Expand Down Expand Up @@ -233,9 +250,8 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
// gets the minimum log index replicated to all servers
@Override
public long getReplicatedMinCommitIndex() {
OptionalLong minIndex =
commitInfoMap.values().parallelStream().mapToLong(v -> v).min();
return minIndex.isPresent() ? minIndex.getAsLong() : 0;
return commitInfoMap.values().parallelStream()
.mapToLong(Long::longValue).min().orElse(0);
}

private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
Expand All @@ -244,51 +260,58 @@ private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
reply.addDatanode(builder.build());
}

private XceiverClientReply newWatchReply(
long watchIndex, Object reason, long replyIndex) {
LOG.debug("watchForCommit({}) returns {} {}",
watchIndex, reason, replyIndex);
final XceiverClientReply reply = new XceiverClientReply(null);
reply.setLogIndex(replyIndex);
return reply;
}

@Override
public XceiverClientReply watchForCommit(long index)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
long commitIndex = getReplicatedMinCommitIndex();
XceiverClientReply clientReply = new XceiverClientReply(null);
if (commitIndex >= index) {
// return the min commit index till which the log has been replicated to
// all servers
clientReply.setLogIndex(commitIndex);
return clientReply;
final long replicatedMin = getReplicatedMinCommitIndex();
if (replicatedMin >= index) {
return newWatchReply(index, "replicatedMin", replicatedMin);
}
RaftClientReply reply;

try {
CompletableFuture<RaftClientReply> replyFuture = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
replyFuture.get();
final RaftClientReply reply = replyFuture.get();
final long updated = updateCommitInfosMap(reply);
Preconditions.checkState(updated >= index);
return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated);
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
if (t instanceof GroupMismatchException) {
throw e;
}
reply = getClient().async()
final RaftClientReply reply = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get();
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
reply.getCommitInfos().stream()
.filter(i -> i.getCommitIndex() < index)
.collect(Collectors.toList());
commitInfoProtoList.parallelStream().forEach(proto -> {
UUID address = RatisHelper.toDatanodeId(proto.getServer());
addDatanodetoReply(address, clientReply);
// since 3 way commit has failed, the updated map from now on will
// only store entries for those datanodes which have had successful
// replication.
commitInfoMap.remove(address);
LOG.info(
"Could not commit index {} on pipeline {} to all the nodes. " +
"Server {} has failed. Committed by majority.",
index, pipeline, address);
});
final XceiverClientReply clientReply = newWatchReply(
index, ReplicationLevel.MAJORITY_COMMITTED, index);
reply.getCommitInfos().stream()
.filter(i -> i.getCommitIndex() < index)
.forEach(proto -> {
UUID address = RatisHelper.toDatanodeId(proto.getServer());
addDatanodetoReply(address, clientReply);
// since 3 way commit has failed, the updated map from now on will
// only store entries for those datanodes which have had successful
// replication.
commitInfoMap.remove(address);
LOG.info(
"Could not commit index {} on pipeline {} to all the nodes. " +
"Server {} has failed. Committed by majority.",
index, pipeline, address);
});
return clientReply;
}
clientReply.setLogIndex(index);
return clientReply;
}

/**
Expand Down