Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

import io.opentracing.Scope;
Expand Down Expand Up @@ -166,16 +165,14 @@ private synchronized void connectToDatanode(DatanodeDetails dn,
}

// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
if (LOG.isDebugEnabled()) {
LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString());
LOG.debug("Connecting to server : {}", dn.getIpAddress());
}
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
.intercept(new GrpcClientInterceptor());
if (secConfig.isGrpcTlsEnabled()) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if (caCert != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand Down Expand Up @@ -390,6 +392,72 @@ public static boolean isReadOnly(
}
}

/**
* Not all datanode container cmd protocol has embedded ozone block token.
* Block token are issued by Ozone Manager and return to Ozone client to
* read/write data on datanode via input/output stream.
* Ozone datanode uses this helper to decide which command requires block
* token.
* @param cmdType
* @return true if it is a cmd that block token should be checked when
* security is enabled
* false if block token does not apply to the command.
*
*/
public static boolean requireBlockToken(
ContainerProtos.Type cmdType) {
switch (cmdType) {
case ReadChunk:
case GetBlock:
case WriteChunk:
case PutBlock:
case PutSmallFile:
case GetSmallFile:
return true;
default:
return false;
}
}

/**
* Return the block ID of container commands that are related to blocks.
* @param msg container command
* @return block ID.
*/
public static BlockID getBlockID(ContainerCommandRequestProto msg) {
switch (msg.getCmdType()) {
case ReadChunk:
if (msg.hasReadChunk()) {
return BlockID.getFromProtobuf(msg.getReadChunk().getBlockID());
}
case GetBlock:
if (msg.hasGetBlock()) {
return BlockID.getFromProtobuf(msg.getGetBlock().getBlockID());
}
case WriteChunk:
if (msg.hasWriteChunk()) {
return BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
}
case PutBlock:
if (msg.hasPutBlock()) {
return BlockID.getFromProtobuf(msg.getPutBlock().getBlockData()
.getBlockID());
}
case PutSmallFile:
if (msg.hasPutSmallFile()) {
return BlockID.getFromProtobuf(msg.getPutSmallFile().getBlock()
.getBlockData().getBlockID());
}
case GetSmallFile:
if (msg.hasGetSmallFile()) {
return BlockID.getFromProtobuf(msg.getGetSmallFile().getBlock()
.getBlockID());
}
default:
return null;
}
}

/**
* Register the provided MBean with additional JMX ObjectName properties.
* If additional properties are not supported then fallback to registering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public abstract class XceiverClientSpi implements Closeable {

final private AtomicInteger referenceCount;
private final AtomicInteger referenceCount;
private boolean isEvicted;

XceiverClientSpi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.security.token;

import com.google.common.base.Strings;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
Expand Down Expand Up @@ -55,68 +57,68 @@ private boolean isExpired(long expiryDate) {
}

@Override
public UserGroupInformation verify(String user, String tokenStr)
throws SCMSecurityException {
if (conf.isBlockTokenEnabled()) {
// TODO: add audit logs.

if (Strings.isNullOrEmpty(tokenStr)) {
throw new BlockTokenException("Fail to find any token (empty or " +
"null.)");
}
final Token<OzoneBlockTokenIdentifier> token = new Token();
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
try {
token.decodeFromUrlString(tokenStr);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Verifying token:{} for user:{} ", token, user);
}
ByteArrayInputStream buf = new ByteArrayInputStream(
token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
tokenId.readFields(in);

} catch (IOException ex) {
throw new BlockTokenException("Failed to decode token : " + tokenStr);
}
public void verify(String user, String tokenStr,
ContainerProtos.Type cmd, String id) throws SCMSecurityException {
if (!conf.isBlockTokenEnabled() || !HddsUtils.requireBlockToken(cmd)) {
return;
}

// TODO: add audit logs.
if (Strings.isNullOrEmpty(tokenStr)) {
throw new BlockTokenException("Fail to find any token (empty or " +
"null.)");
}

if (caClient == null) {
throw new SCMSecurityException("Certificate client not available " +
"to validate token");
final Token<OzoneBlockTokenIdentifier> token = new Token();
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
try {
token.decodeFromUrlString(tokenStr);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Verifying token:{} for user:{} ", token, user);
}
ByteArrayInputStream buf = new ByteArrayInputStream(
token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
tokenId.readFields(in);

X509Certificate singerCert;
singerCert = caClient.getCertificate(tokenId.getOmCertSerialId());
} catch (IOException ex) {
throw new BlockTokenException("Failed to decode token : " + tokenStr);
}

if (singerCert == null) {
throw new BlockTokenException("Can't find signer certificate " +
"(OmCertSerialId: " + tokenId.getOmCertSerialId() +
") of the block token for user: " + tokenId.getUser());
}
boolean validToken = caClient.verifySignature(tokenId.getBytes(),
token.getPassword(), singerCert);
if (!validToken) {
throw new BlockTokenException("Invalid block token for user: " +
tokenId.getUser());
}
if (caClient == null) {
throw new SCMSecurityException("Certificate client not available " +
"to validate token");
}

// check expiration
if (isExpired(tokenId.getExpiryDate())) {
UserGroupInformation tokenUser = tokenId.getUser();
tokenUser.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.TOKEN);
throw new BlockTokenException("Expired block token for user: " +
tokenUser);
}
// defer access mode, bcsid and maxLength check to container dispatcher
UserGroupInformation ugi = tokenId.getUser();
ugi.addToken(token);
ugi.setAuthenticationMethod(UserGroupInformation
.AuthenticationMethod.TOKEN);
return ugi;
} else {
return UserGroupInformation.createRemoteUser(user);
UserGroupInformation tokenUser = tokenId.getUser();
X509Certificate signerCert;
signerCert = caClient.getCertificate(tokenId.getOmCertSerialId());

if (signerCert == null) {
throw new BlockTokenException("Can't find signer certificate " +
"(OmCertSerialId: " + tokenId.getOmCertSerialId() +
") of the block token for user: " + tokenUser);
}
boolean validToken = caClient.verifySignature(tokenId.getBytes(),
token.getPassword(), signerCert);
if (!validToken) {
throw new BlockTokenException("Invalid block token for user: " +
tokenId.getUser());
}
// check expiration
if (isExpired(tokenId.getExpiryDate())) {
throw new BlockTokenException("Expired block token for user: " +
tokenUser);
}

// Token block id mismatch
if (!tokenId.getBlockId().equals(id)) {
throw new BlockTokenException("Block id mismatch. Token for block ID: " +
tokenId.getBlockId() + " can't be used to access block: " + id +
" by user: " + tokenUser);
}

// TODO: check cmd type and the permissions(AccessMode) in the token
}

public static boolean isTestStub() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.security.token;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -70,7 +71,7 @@ public OzoneBlockTokenIdentifier(String ownerId, String blockId,

@Override
public UserGroupInformation getUser() {
if (this.getOwnerId() == null || "".equals(this.getOwnerId())) {
if (Strings.isNullOrEmpty(this.getOwnerId())) {
return UserGroupInformation.createRemoteUser(blockId);
}
return UserGroupInformation.createRemoteUser(ownerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@

package org.apache.hadoop.hdds.security.token;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.security.UserGroupInformation;

/**
* Ozone GRPC token header verifier.
*/
public interface TokenVerifier {
/**
* Given a user and tokenStr header, return a UGI object with token if
* verified.
* Given a user, tokenStr, cmd and container/block id, verify the token.
* @param user user of the request
* @param tokenStr token str of the request
* @param cmd container command type
* @param id blockID/containerID
* @return UGI
* @throws SCMSecurityException
* @throws SCMSecurityException if token verification fails.
*/
UserGroupInformation verify(String user, String tokenStr)
throws SCMSecurityException;
void verify(String user, String tokenStr,
ContainerProtos.Type cmd, String id) throws SCMSecurityException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;

import io.opentracing.Scope;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -89,14 +91,17 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private final float containerCloseThreshold;
private String scmID;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private final boolean isBlockTokenEnabled;

/**
* Constructs an OzoneContainer that receives calls from
* XceiverServerHandler.
*/
public HddsDispatcher(Configuration config, ContainerSet contSet,
VolumeSet volumes, Map<ContainerType, Handler> handlers,
StateContext context, ContainerMetrics metrics) {
StateContext context, ContainerMetrics metrics,
TokenVerifier tokenVerifier) {
this.conf = config;
this.containerSet = contSet;
this.volumeSet = volumes;
Expand All @@ -106,6 +111,10 @@ public HddsDispatcher(Configuration config, ContainerSet contSet,
this.containerCloseThreshold = conf.getFloat(
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.isBlockTokenEnabled = conf.getBoolean(
HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED,
HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
this.tokenVerifier = tokenVerifier;
}

@Override
Expand Down Expand Up @@ -183,7 +192,15 @@ private ContainerCommandResponseProto dispatchRequest(
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);

// if the command gets executed other than Ratis, the default wroite stage
try {
validateBlockToken(msg);
} catch (IOException ioe) {
StorageContainerException sce = new StorageContainerException(
"Block token verification failed. " + ioe.getMessage(), ioe,
ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
// if the command gets executed other than Ratis, the default write stage
// is WriteChunkStage.COMBINED
boolean isCombinedStage =
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
Expand Down Expand Up @@ -387,6 +404,17 @@ ContainerCommandResponseProto createContainer(
return handler.handle(requestBuilder.build(), null, null);
}

private void validateBlockToken(
ContainerCommandRequestProto msg) throws IOException {
if (isBlockTokenEnabled && tokenVerifier != null &&
HddsUtils.requireBlockToken(msg.getCmdType())) {
tokenVerifier.verify(
UserGroupInformation.getCurrentUser().getShortUserName(),
msg.getEncodedToken(), msg.getCmdType(),
HddsUtils.getBlockID(msg).getContainerBlockID().toString());
}
}

/**
* This will be called as a part of creating the log entry during
* startTransaction in Ratis on the leader node. In such cases, if the
Expand Down Expand Up @@ -444,6 +472,15 @@ public void validateContainerCommand(
audit(action, eventType, params, AuditEventStatus.FAILURE, iex);
throw iex;
}

try {
validateBlockToken(msg);
} catch (IOException ioe) {
StorageContainerException sce = new StorageContainerException(
"Block token verification failed. " + ioe.getMessage(), ioe,
ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
throw sce;
}
}

/**
Expand Down
Loading