Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.CheckedFunction;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -83,15 +85,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param config -- Ozone Config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param config -- Ozone Config
* @param config - Ozone Config

*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
Expand All @@ -101,9 +103,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {

/**
* To be used when grpc token is not enabled.
* */
@Override
public void connect() throws Exception {
*/
@Override public void connect() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of modifying it?

// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
Expand All @@ -112,9 +113,8 @@ public void connect() throws Exception {

/**
* Passed encoded token to GRPC header when security is enabled.
* */
@Override
public void connect(String encodedToken) throws Exception {
*/
@Override public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
Expand All @@ -132,11 +132,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
}

// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser()
.getShortUserName();
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
Expand All @@ -149,8 +148,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
if (trustCertCollectionFile != null) {
sslContextBuilder.trustManager(trustCertCollectionFile);
}
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null &&
privateKeyFile != null) {
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
&& privateKeyFile != null) {
sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
}

Expand All @@ -174,17 +173,15 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
*
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
public boolean isConnected(DatanodeDetails details) {
@VisibleForTesting public boolean isConnected(DatanodeDetails details) {
return isConnected(channels.get(details.getUuid()));
}

private boolean isConnected(ManagedChannel channel) {
return channel != null && !channel.isTerminated() && !channel.isShutdown();
}

@Override
public void close() {
@Override public void close() {
closed = true;
for (ManagedChannel channel : channels.values()) {
channel.shutdownNow();
Expand Down Expand Up @@ -216,77 +213,82 @@ public ContainerCommandResponseProto sendCommand(
}

@Override
public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithTraceIDAndRetry(request, excludeDns);
try {
XceiverClientReply reply;
reply = sendCommandWithTraceIDAndRetry(request, function);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to execute command " + request, e);
}
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return sendCommandWithRetry(finalPayload, excludeDns);
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, function);
}
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.

// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
List<DatanodeDetails> dns = pipeline.getNodes();
List<DatanodeDetails> healthyDns =
excludeDns != null ? dns.stream().filter(dnId -> {
for (DatanodeDetails excludeId : excludeDns) {
if (dnId.equals(excludeId)) {
return false;
}
}
return true;
}).collect(Collectors.toList()) : dns;
XceiverClientReply reply = new XceiverClientReply(null);
for (DatanodeDetails dn : healthyDns) {
for (DatanodeDetails dn : pipeline.getNodes()) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
break;
if (function != null) {
function.apply(responseProto);
}
} catch (ExecutionException | InterruptedException e) {
break;
} catch (ExecutionException | InterruptedException | IOException e) {
LOG.debug("Failed to execute command " + request + " on datanode " + dn
.getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
if (!(e instanceof IOException)) {
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
}
ioException = new IOException(e);
} else {
ioException = (IOException) e;
}
responseProto = null;
}
}

if (responseProto != null) {
reply.setResponse(CompletableFuture.completedFuture(responseProto));
return reply;
} else {
throw new IOException(
"Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
Preconditions.checkNotNull(ioException);
LOG.error("Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
throw ioException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.storage.CheckedFunction;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
Expand Down Expand Up @@ -69,7 +71,8 @@
* The underlying RPC mechanism can be chosen via the constructor.
*/
public final class XceiverClientRatis extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static final Logger LOG =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Logger LOG =
private static final Logger LOG =

LoggerFactory.getLogger(XceiverClientRatis.class);

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Expand Down Expand Up @@ -248,13 +251,17 @@ public XceiverClientReply watchForCommit(long index, long timeout)
return clientReply;
}
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
RaftClientReply reply;
try {
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException toe) {
LOG.warn("3 way commit failed ", toe);
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed ", e);
if (t instanceof GroupMismatchException) {
throw e;
}
reply = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.ipc.Client;
Expand All @@ -40,6 +41,10 @@
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,8 +55,10 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;

/**
* Utility methods for Ozone and Container Clients.
Expand All @@ -72,6 +79,18 @@ public final class HddsClientUtils {
private HddsClientUtils() {
}

private static final List<Class<? extends Exception>> EXCEPTION_LIST =
new ArrayList<Class<? extends Exception>>() {{
add(TimeoutException.class);
add(ContainerNotOpenException.class);
add(RaftRetryFailureException.class);
add(AlreadyClosedException.class);
add(GroupMismatchException.class);
// Not Replicated Exception will be thrown if watch For commit
// does not succeed
add(NotReplicatedException.class);
}};

/**
* Date format that used in ozone. Here the format is thread safe to use.
*/
Expand Down Expand Up @@ -290,4 +309,23 @@ public static SCMSecurityProtocol getScmSecurityClient(
Client.getRpcTimeout(conf)));
return scmSecurityClient;
}

public static Throwable checkForException(Exception e) throws IOException {
Throwable t = e;
while (t != null) {
for (Class<? extends Exception> cls : getExceptionList()) {
if (cls.isInstance(t)) {
return t;
}
}
t = t.getCause();
}

throw e instanceof IOException ? (IOException)e : new IOException(e);
}


public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}
}
Loading