Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.CheckedFunction;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -83,15 +85,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
Expand All @@ -101,9 +103,8 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {

/**
* To be used when grpc token is not enabled.
* */
@Override
public void connect() throws Exception {
*/
@Override public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
Expand All @@ -112,9 +113,8 @@ public void connect() throws Exception {

/**
* Passed encoded token to GRPC header when security is enabled.
* */
@Override
public void connect(String encodedToken) throws Exception {
*/
@Override public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
Expand All @@ -132,11 +132,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
}

// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser()
.getShortUserName();
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
Expand All @@ -149,8 +148,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
if (trustCertCollectionFile != null) {
sslContextBuilder.trustManager(trustCertCollectionFile);
}
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null &&
privateKeyFile != null) {
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
&& privateKeyFile != null) {
sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
}

Expand All @@ -174,17 +173,15 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
*
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
public boolean isConnected(DatanodeDetails details) {
@VisibleForTesting public boolean isConnected(DatanodeDetails details) {
return isConnected(channels.get(details.getUuid()));
}

private boolean isConnected(ManagedChannel channel) {
return channel != null && !channel.isTerminated() && !channel.isShutdown();
}

@Override
public void close() {
@Override public void close() {
closed = true;
for (ManagedChannel channel : channels.values()) {
channel.shutdownNow();
Expand Down Expand Up @@ -216,77 +213,82 @@ public ContainerCommandResponseProto sendCommand(
}

@Override
public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithTraceIDAndRetry(request, excludeDns);
try {
XceiverClientReply reply;
reply = sendCommandWithTraceIDAndRetry(request, function);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to execute command " + request, e);
}
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return sendCommandWithRetry(finalPayload, excludeDns);
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, function);
}
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, CheckedFunction function)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.

// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
List<DatanodeDetails> dns = pipeline.getNodes();
List<DatanodeDetails> healthyDns =
excludeDns != null ? dns.stream().filter(dnId -> {
for (DatanodeDetails excludeId : excludeDns) {
if (dnId.equals(excludeId)) {
return false;
}
}
return true;
}).collect(Collectors.toList()) : dns;
XceiverClientReply reply = new XceiverClientReply(null);
for (DatanodeDetails dn : healthyDns) {
for (DatanodeDetails dn : pipeline.getNodes()) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
break;
if (function != null) {
function.apply(responseProto);
}
} catch (ExecutionException | InterruptedException e) {
break;
} catch (ExecutionException | InterruptedException | IOException e) {
LOG.debug("Failed to execute command " + request + " on datanode " + dn
.getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
if (!(e instanceof IOException)) {
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
}
ioException = new IOException(e);
} else {
ioException = (IOException) e;
}
responseProto = null;
}
}

if (responseProto != null) {
reply.setResponse(CompletableFuture.completedFuture(responseProto));
return reply;
} else {
throw new IOException(
"Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
Preconditions.checkNotNull(ioException);
LOG.error("Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
throw ioException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.storage.CheckedFunction;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class BlockInputStream extends InputStream implements Seekable {
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
private final boolean verifyChecksum;
private boolean verifyChecksum;

/**
* Creates a new BlockInputStream.
Expand Down Expand Up @@ -323,41 +324,8 @@ private boolean chunksRemaining() {
private synchronized void readChunkFromContainer() throws IOException {
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<DatanodeDetails> excludeDns = null;
ByteString byteString;
List<DatanodeDetails> dnList = getDatanodeList();
while (true) {
List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
try {
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new IOException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(),
byteString.size()));
}
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
if (verifyChecksum) {
Checksum.verifyChecksum(byteString, checksumData);
}
break;
} catch (IOException ioe) {
// we will end up in this situation only if the checksum mismatch
// happens or the length of the chunk mismatches.
// In this case, read should be retried on a different replica.
// TODO: Inform SCM of a possible corrupt container replica here
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
excludeDns.addAll(dnListFromReadChunkCall);
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
}
}

byteString = readChunk(chunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;
Expand All @@ -372,28 +340,17 @@ private synchronized void readChunkFromContainer() throws IOException {
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
ReadChunkResponseProto readChunkResponse;
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
dnListFromReply.addAll(reply.getDatanodes());
readChunkResponse = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, validator);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
return readChunkResponse.getData();
}
Expand All @@ -403,6 +360,34 @@ protected List<DatanodeDetails> getDatanodeList() {
return xceiverClient.getPipeline().getNodes();
}

private CheckedFunction<ContainerProtos.ContainerCommandResponseProto, IOException>
validator = (response) -> {
ReadChunkResponseProto readChunkResponse;
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
ByteString byteString;
try {
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
byteString = readChunkResponse.getData();
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
}
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
if (verifyChecksum) {
Checksum.verifyChecksum(byteString, checksumData);
}
};

@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || (chunks.size() == 0 && pos > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ private static class DummyBlockInputStream extends BlockInputStream {
}

@Override
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
}
Expand Down
Loading