diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 236370b2ab9e..9469db4d3f94 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import io.opentracing.Scope; @@ -166,7 +165,6 @@ private synchronized void connectToDatanode(DatanodeDetails dn, } // Add credential context to the client call - String userName = UserGroupInformation.getCurrentUser().getShortUserName(); if (LOG.isDebugEnabled()) { LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString()); LOG.debug("Connecting to server : {}", dn.getIpAddress()); @@ -174,8 +172,7 @@ private synchronized void connectToDatanode(DatanodeDetails dn, NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) - .intercept(new ClientCredentialInterceptor(userName, encodedToken), - new GrpcClientInterceptor()); + .intercept(new GrpcClientInterceptor()); if (secConfig.isGrpcTlsEnabled()) { SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); if (caCert != null) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index c3c84b66b284..92b9345e8647 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -38,12 +38,14 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.retry.RetryPolicies; @@ -390,6 +392,72 @@ public static boolean isReadOnly( } } + /** + * Not all datanode container cmd protocol has embedded ozone block token. + * Block token are issued by Ozone Manager and return to Ozone client to + * read/write data on datanode via input/output stream. + * Ozone datanode uses this helper to decide which command requires block + * token. + * @param cmdType + * @return true if it is a cmd that block token should be checked when + * security is enabled + * false if block token does not apply to the command. + * + */ + public static boolean requireBlockToken( + ContainerProtos.Type cmdType) { + switch (cmdType) { + case ReadChunk: + case GetBlock: + case WriteChunk: + case PutBlock: + case PutSmallFile: + case GetSmallFile: + return true; + default: + return false; + } + } + + /** + * Return the block ID of container commands that are related to blocks. + * @param msg container command + * @return block ID. + */ + public static BlockID getBlockID(ContainerCommandRequestProto msg) { + switch (msg.getCmdType()) { + case ReadChunk: + if (msg.hasReadChunk()) { + return BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()); + } + case GetBlock: + if (msg.hasGetBlock()) { + return BlockID.getFromProtobuf(msg.getGetBlock().getBlockID()); + } + case WriteChunk: + if (msg.hasWriteChunk()) { + return BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID()); + } + case PutBlock: + if (msg.hasPutBlock()) { + return BlockID.getFromProtobuf(msg.getPutBlock().getBlockData() + .getBlockID()); + } + case PutSmallFile: + if (msg.hasPutSmallFile()) { + return BlockID.getFromProtobuf(msg.getPutSmallFile().getBlock() + .getBlockData().getBlockID()); + } + case GetSmallFile: + if (msg.hasGetSmallFile()) { + return BlockID.getFromProtobuf(msg.getGetSmallFile().getBlock() + .getBlockID()); + } + default: + return null; + } + } + /** * Register the provided MBean with additional JMX ObjectName properties. * If additional properties are not supported then fallback to registering diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 5631badf44c9..d9e5a1f37cfe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -38,7 +38,7 @@ */ public abstract class XceiverClientSpi implements Closeable { - final private AtomicInteger referenceCount; + private final AtomicInteger referenceCount; private boolean isEvicted; XceiverClientSpi() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java index e94808ac9d7d..ea222dfe6bdd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.security.token; import com.google.common.base.Strings; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; @@ -55,68 +57,68 @@ private boolean isExpired(long expiryDate) { } @Override - public UserGroupInformation verify(String user, String tokenStr) - throws SCMSecurityException { - if (conf.isBlockTokenEnabled()) { - // TODO: add audit logs. - - if (Strings.isNullOrEmpty(tokenStr)) { - throw new BlockTokenException("Fail to find any token (empty or " + - "null.)"); - } - final Token token = new Token(); - OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); - try { - token.decodeFromUrlString(tokenStr); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Verifying token:{} for user:{} ", token, user); - } - ByteArrayInputStream buf = new ByteArrayInputStream( - token.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - tokenId.readFields(in); - - } catch (IOException ex) { - throw new BlockTokenException("Failed to decode token : " + tokenStr); - } + public void verify(String user, String tokenStr, + ContainerProtos.Type cmd, String id) throws SCMSecurityException { + if (!conf.isBlockTokenEnabled() || !HddsUtils.requireBlockToken(cmd)) { + return; + } + + // TODO: add audit logs. + if (Strings.isNullOrEmpty(tokenStr)) { + throw new BlockTokenException("Fail to find any token (empty or " + + "null.)"); + } - if (caClient == null) { - throw new SCMSecurityException("Certificate client not available " + - "to validate token"); + final Token token = new Token(); + OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); + try { + token.decodeFromUrlString(tokenStr); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Verifying token:{} for user:{} ", token, user); } + ByteArrayInputStream buf = new ByteArrayInputStream( + token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + tokenId.readFields(in); - X509Certificate singerCert; - singerCert = caClient.getCertificate(tokenId.getOmCertSerialId()); + } catch (IOException ex) { + throw new BlockTokenException("Failed to decode token : " + tokenStr); + } - if (singerCert == null) { - throw new BlockTokenException("Can't find signer certificate " + - "(OmCertSerialId: " + tokenId.getOmCertSerialId() + - ") of the block token for user: " + tokenId.getUser()); - } - boolean validToken = caClient.verifySignature(tokenId.getBytes(), - token.getPassword(), singerCert); - if (!validToken) { - throw new BlockTokenException("Invalid block token for user: " + - tokenId.getUser()); - } + if (caClient == null) { + throw new SCMSecurityException("Certificate client not available " + + "to validate token"); + } - // check expiration - if (isExpired(tokenId.getExpiryDate())) { - UserGroupInformation tokenUser = tokenId.getUser(); - tokenUser.setAuthenticationMethod( - UserGroupInformation.AuthenticationMethod.TOKEN); - throw new BlockTokenException("Expired block token for user: " + - tokenUser); - } - // defer access mode, bcsid and maxLength check to container dispatcher - UserGroupInformation ugi = tokenId.getUser(); - ugi.addToken(token); - ugi.setAuthenticationMethod(UserGroupInformation - .AuthenticationMethod.TOKEN); - return ugi; - } else { - return UserGroupInformation.createRemoteUser(user); + UserGroupInformation tokenUser = tokenId.getUser(); + X509Certificate signerCert; + signerCert = caClient.getCertificate(tokenId.getOmCertSerialId()); + + if (signerCert == null) { + throw new BlockTokenException("Can't find signer certificate " + + "(OmCertSerialId: " + tokenId.getOmCertSerialId() + + ") of the block token for user: " + tokenUser); + } + boolean validToken = caClient.verifySignature(tokenId.getBytes(), + token.getPassword(), signerCert); + if (!validToken) { + throw new BlockTokenException("Invalid block token for user: " + + tokenId.getUser()); } + // check expiration + if (isExpired(tokenId.getExpiryDate())) { + throw new BlockTokenException("Expired block token for user: " + + tokenUser); + } + + // Token block id mismatch + if (!tokenId.getBlockId().equals(id)) { + throw new BlockTokenException("Block id mismatch. Token for block ID: " + + tokenId.getBlockId() + " can't be used to access block: " + id + + " by user: " + tokenUser); + } + + // TODO: check cmd type and the permissions(AccessMode) in the token } public static boolean isTestStub() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java index 54cf18002c37..0f77f3200214 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.security.token; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; @@ -70,7 +71,7 @@ public OzoneBlockTokenIdentifier(String ownerId, String blockId, @Override public UserGroupInformation getUser() { - if (this.getOwnerId() == null || "".equals(this.getOwnerId())) { + if (Strings.isNullOrEmpty(this.getOwnerId())) { return UserGroupInformation.createRemoteUser(blockId); } return UserGroupInformation.createRemoteUser(ownerId); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java index d8170abe8170..2a5b18f36e91 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java @@ -18,21 +18,22 @@ package org.apache.hadoop.hdds.security.token; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; -import org.apache.hadoop.security.UserGroupInformation; /** * Ozone GRPC token header verifier. */ public interface TokenVerifier { /** - * Given a user and tokenStr header, return a UGI object with token if - * verified. + * Given a user, tokenStr, cmd and container/block id, verify the token. * @param user user of the request * @param tokenStr token str of the request + * @param cmd container command type + * @param id blockID/containerID * @return UGI - * @throws SCMSecurityException + * @throws SCMSecurityException if token verification fails. */ - UserGroupInformation verify(String user, String tokenStr) - throws SCMSecurityException; + void verify(String user, String tokenStr, + ContainerProtos.Type cmd, String id) throws SCMSecurityException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 76f6b3cd2f18..7f4670742cd2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -33,6 +33,7 @@ .InvalidContainerStateException; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.audit.AuditAction; import org.apache.hadoop.ozone.audit.AuditEventStatus; @@ -64,6 +65,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import io.opentracing.Scope; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +91,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { private final float containerCloseThreshold; private String scmID; private ContainerMetrics metrics; + private final TokenVerifier tokenVerifier; + private final boolean isBlockTokenEnabled; /** * Constructs an OzoneContainer that receives calls from @@ -96,7 +100,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { */ public HddsDispatcher(Configuration config, ContainerSet contSet, VolumeSet volumes, Map handlers, - StateContext context, ContainerMetrics metrics) { + StateContext context, ContainerMetrics metrics, + TokenVerifier tokenVerifier) { this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; @@ -106,6 +111,10 @@ public HddsDispatcher(Configuration config, ContainerSet contSet, this.containerCloseThreshold = conf.getFloat( HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD, HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT); + this.isBlockTokenEnabled = conf.getBoolean( + HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, + HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); + this.tokenVerifier = tokenVerifier; } @Override @@ -183,7 +192,15 @@ private ContainerCommandResponseProto dispatchRequest( && dispatcherContext.getStage() == DispatcherContext.WriteChunkStage.COMMIT_DATA); - // if the command gets executed other than Ratis, the default wroite stage + try { + validateBlockToken(msg); + } catch (IOException ioe) { + StorageContainerException sce = new StorageContainerException( + "Block token verification failed. " + ioe.getMessage(), ioe, + ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED); + return ContainerUtils.logAndReturnError(LOG, sce, msg); + } + // if the command gets executed other than Ratis, the default write stage // is WriteChunkStage.COMBINED boolean isCombinedStage = cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null @@ -387,6 +404,17 @@ ContainerCommandResponseProto createContainer( return handler.handle(requestBuilder.build(), null, null); } + private void validateBlockToken( + ContainerCommandRequestProto msg) throws IOException { + if (isBlockTokenEnabled && tokenVerifier != null && + HddsUtils.requireBlockToken(msg.getCmdType())) { + tokenVerifier.verify( + UserGroupInformation.getCurrentUser().getShortUserName(), + msg.getEncodedToken(), msg.getCmdType(), + HddsUtils.getBlockID(msg).getContainerBlockID().toString()); + } + } + /** * This will be called as a part of creating the log entry during * startTransaction in Ratis on the leader node. In such cases, if the @@ -444,6 +472,15 @@ public void validateContainerCommand( audit(action, eventType, params, AuditEventStatus.FAILURE, iex); throw iex; } + + try { + validateBlockToken(msg); + } catch (IOException ioe) { + StorageContainerException sce = new StorageContainerException( + "Block token verification failed. " + ioe.getMessage(), ioe, + ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED); + throw sce; + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index dc5f5bc8547c..37b7d5dcff4a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -23,9 +23,7 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto .XceiverClientProtocolServiceGrpc; -import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,18 +39,9 @@ public class GrpcXceiverService extends LOG = LoggerFactory.getLogger(GrpcXceiverService.class); private final ContainerDispatcher dispatcher; - private final boolean isGrpcTokenEnabled; - private final TokenVerifier tokenVerifier; public GrpcXceiverService(ContainerDispatcher dispatcher) { - this(dispatcher, false, null); - } - - public GrpcXceiverService(ContainerDispatcher dispatcher, - boolean grpcTokenEnabled, TokenVerifier tokenVerifier) { this.dispatcher = dispatcher; - this.isGrpcTokenEnabled = grpcTokenEnabled; - this.tokenVerifier = tokenVerifier; } @Override @@ -64,11 +53,6 @@ public StreamObserver send( @Override public void onNext(ContainerCommandRequestProto request) { try { - if(isGrpcTokenEnabled) { - // ServerInterceptors intercepts incoming request and creates ugi. - tokenVerifier.verify(UserGroupInformation.getCurrentUser() - .getShortUserName(), request.getEncodedToken()); - } ContainerCommandResponseProto resp = dispatcher.dispatch(request, null); responseObserver.onNext(resp); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java deleted file mode 100644 index 968f0c807111..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.  See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.  The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.  You may obtain a copy of the License at - * - *      http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import org.apache.hadoop.hdds.security.exception.SCMSecurityException; -import org.apache.hadoop.hdds.security.token.TokenVerifier; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ratis.thirdparty.io.grpc.Context; -import org.apache.ratis.thirdparty.io.grpc.Contexts; -import org.apache.ratis.thirdparty.io.grpc.Metadata; -import org.apache.ratis.thirdparty.io.grpc.ServerCall; -import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler; -import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor; -import org.apache.ratis.thirdparty.io.grpc.Status; - -import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY; -import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY; -import static org.apache.hadoop.ozone.OzoneConsts.UGI_CTX_KEY; -/** - * Grpc Server Interceptor for Ozone Block token. - */ -public class ServerCredentialInterceptor implements ServerInterceptor { - - - private static final ServerCall.Listener NOOP_LISTENER = - new ServerCall.Listener() { - }; - - private final TokenVerifier verifier; - - ServerCredentialInterceptor(TokenVerifier verifier) { - this.verifier = verifier; - } - - @Override - public ServerCall.Listener interceptCall( - ServerCall call, Metadata headers, - ServerCallHandler next) { - String token = headers.get(OBT_METADATA_KEY); - String user = headers.get(USER_METADATA_KEY); - Context ctx = Context.current(); - try { - UserGroupInformation ugi = verifier.verify(user, token); - if (ugi == null) { - call.close(Status.UNAUTHENTICATED.withDescription("Missing Block " + - "Token from headers when block token is required."), headers); - return NOOP_LISTENER; - } else { - ctx = ctx.withValue(UGI_CTX_KEY, ugi); - } - } catch (SCMSecurityException e) { - call.close(Status.UNAUTHENTICATED.withDescription(e.getMessage()) - .withCause(e), headers); - return NOOP_LISTENER; - } - return Contexts.interceptCall(ctx, call, headers, next); - } -} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java deleted file mode 100644 index c6b0d9238bcc..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.security.exception.SCMSecurityException; -import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; -import org.apache.hadoop.hdds.security.token.TokenVerifier; -import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN; - -/** - * A server endpoint that acts as the communication layer for Ozone containers. - */ -public abstract class XceiverServer implements XceiverServerSpi { - - private final SecurityConfig secConfig; - private final TokenVerifier tokenVerifier; - private final CertificateClient caClient; - - public XceiverServer(Configuration conf, CertificateClient client) { - Preconditions.checkNotNull(conf); - this.secConfig = new SecurityConfig(conf); - this.caClient = client; - tokenVerifier = new BlockTokenVerifier(secConfig, getCaClient()); - } - - /** - * Default implementation which just validates security token if security is - * enabled. - * - * @param request ContainerCommandRequest - */ - @Override - public void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) throws IOException { - if (secConfig.isSecurityEnabled()) { - String encodedToken = request.getEncodedToken(); - if (encodedToken == null) { - throw new SCMSecurityException("Security is enabled but client " + - "request is missing block token.", MISSING_BLOCK_TOKEN); - } - tokenVerifier.verify(encodedToken, encodedToken); - } - } - - @VisibleForTesting - protected CertificateClient getCaClient() { - return caClient; - } - - protected SecurityConfig getSecurityConfig() { - return secConfig; - } - - protected TokenVerifier getBlockTokenVerifier() { - return tokenVerifier; - } - - public SecurityConfig getSecConfig() { - return secConfig; - } - -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index bb352ea5165c..441d9c8a0568 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers. StorageContainerException; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; @@ -59,7 +60,7 @@ * Creates a Grpc server endpoint that acts as the communication layer for * Ozone containers. */ -public final class XceiverServerGrpc extends XceiverServer { +public final class XceiverServerGrpc implements XceiverServerSpi { private static final Logger LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); private static final String COMPONENT = "dn"; @@ -79,7 +80,6 @@ public final class XceiverServerGrpc extends XceiverServer { public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ContainerDispatcher dispatcher, CertificateClient caClient, BindableService... additionalServices) { - super(conf, caClient); Preconditions.checkNotNull(conf); this.id = datanodeDetails.getUuid(); @@ -96,25 +96,21 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ((NettyServerBuilder) ServerBuilder.forPort(port)) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); - ServerCredentialInterceptor credInterceptor = - new ServerCredentialInterceptor(getBlockTokenVerifier()); GrpcServerInterceptor tracingInterceptor = new GrpcServerInterceptor(); nettyServerBuilder.addService(ServerInterceptors.intercept( - new GrpcXceiverService(dispatcher, - getSecurityConfig().isBlockTokenEnabled(), - getBlockTokenVerifier()), credInterceptor, - tracingInterceptor)); + new GrpcXceiverService(dispatcher), tracingInterceptor)); for (BindableService service : additionalServices) { nettyServerBuilder.addService(service); } - if (getSecConfig().isGrpcTlsEnabled()) { + SecurityConfig secConf = new SecurityConfig(conf); + if (secConf.isGrpcTlsEnabled()) { try { SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer( caClient.getPrivateKey(), caClient.getCertificate()); SslContextBuilder sslContextBuilder = GrpcSslContexts.configure( - sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider()); + sslClientContextBuilder, secConf.getGrpcSslProvider()); nettyServerBuilder.sslContext(sslContextBuilder.build()); } catch (Exception ex) { LOG.error("Unable to setup TLS for secure datanode GRPC endpoint.", ex); @@ -180,8 +176,6 @@ public void submitRequest(ContainerCommandRequestProto request, .importAndCreateScope( "XceiverServerGrpc." + request.getCmdType().name(), request.getTraceID())) { - - super.submitRequest(request, pipelineID); ContainerProtos.ContainerCommandResponseProto response = storageContainer.dispatch(request, null); if (response.getResult() != ContainerProtos.Result.SUCCESS) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index d37a0bfb6c2b..9c6b0c90a90c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; @@ -59,8 +58,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ReadChunkResponseProto; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.hdds.security.token.TokenVerifier; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.storage.RaftStorage; @@ -107,7 +104,7 @@ * The write requests can be divided into requests with user data * (WriteChunkRequest) and other request without user data. * - * Inorder to optimize the write throughput, the writeChunk request is + * In order to optimize the write throughput, the writeChunk request is * processed in 2 phases. The 2 phases are divided in * {@link #startTransaction(RaftClientRequest)}, in the first phase the user * data is written directly into the state machine via @@ -127,7 +124,7 @@ * * 2) Write chunk commit operation is executed after write chunk state machine * operation. This will ensure that commit operation is sync'd with the state - * machine operation.For example, synchronization between writeChunk and + * machine operation. For example, synchronization between writeChunk and * createContainer in {@link ContainerStateMachine}. **/ @@ -149,8 +146,6 @@ public class ContainerStateMachine extends BaseStateMachine { private ExecutorService[] executors; private final Map applyTransactionCompletionMap; private final Cache stateMachineDataCache; - private final boolean isBlockTokenEnabled; - private final TokenVerifier tokenVerifier; private final AtomicBoolean stateMachineHealthy; private final Semaphore applyTransactionSemaphore; @@ -162,9 +157,7 @@ public class ContainerStateMachine extends BaseStateMachine { @SuppressWarnings("parameternumber") public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ContainerController containerController, ThreadPoolExecutor chunkExecutor, - XceiverServerRatis ratisServer, long expiryInterval, - boolean isBlockTokenEnabled, TokenVerifier tokenVerifier, - Configuration conf) { + XceiverServerRatis ratisServer, long expiryInterval, Configuration conf) { this.gid = gid; this.dispatcher = dispatcher; this.containerController = containerController; @@ -178,8 +171,6 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, // set the limit on no of cached entries equal to no of max threads // executing writeStateMachineData .maximumSize(chunkExecutor.getCorePoolSize()).build(); - this.isBlockTokenEnabled = isBlockTokenEnabled; - this.tokenVerifier = tokenVerifier; this.container2BCSIDMap = new ConcurrentHashMap<>(); final int numContainerOpExecutors = conf.getInt( @@ -396,19 +387,6 @@ private ContainerCommandResponseProto dispatchCommand( requestProto.getCmdType(), requestProto.getContainerID(), requestProto.getPipelineID(), requestProto.getTraceID()); } - if (isBlockTokenEnabled) { - try { - // ServerInterceptors intercepts incoming request and creates ugi. - tokenVerifier - .verify(UserGroupInformation.getCurrentUser().getShortUserName(), - requestProto.getEncodedToken()); - } catch (IOException ioe) { - StorageContainerException sce = new StorageContainerException( - "Block token verification failed. " + ioe.getMessage(), ioe, - ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED); - return ContainerUtils.logAndReturnError(LOG, sce, requestProto); - } - } ContainerCommandResponseProto response = dispatcher.dispatch(requestProto, context); if (LOG.isTraceEnabled()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 80e91cdf55de..f816bcb252b0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -38,9 +38,9 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import io.opentracing.Scope; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ratis.RaftConfigKeys; import org.apache.hadoop.hdds.ratis.RatisHelper; @@ -82,7 +82,7 @@ * Creates a ratis server endpoint that acts as the communication layer for * Ozone containers. */ -public final class XceiverServerRatis extends XceiverServer { +public final class XceiverServerRatis implements XceiverServerSpi { private static final Logger LOG = LoggerFactory .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); @@ -108,13 +108,10 @@ private static long nextCallId() { // pipelines private final Set raftGids = new HashSet<>(); - @SuppressWarnings("parameternumber") private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, ContainerController containerController, - StateContext context, GrpcTlsConfig tlsConfig, CertificateClient caClient, - OzoneConfiguration conf) + StateContext context, GrpcTlsConfig tlsConfig, OzoneConfiguration conf) throws IOException { - super(conf, caClient); this.conf = conf; Objects.requireNonNull(dd, "id == null"); datanodeDetails = dd; @@ -153,7 +150,6 @@ private XceiverServerRatis(DatanodeDetails dd, int port, private ContainerStateMachine getStateMachine(RaftGroupId gid) { return new ContainerStateMachine(gid, dispatcher, containerController, chunkExecutor, this, cacheEntryExpiryInteval, - getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier(), conf); } @@ -409,7 +405,7 @@ public static XceiverServerRatis newXceiverServerRatis( new SecurityConfig(ozoneConf), caClient); return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, - containerController, context, tlsConfig, caClient, ozoneConf); + containerController, context, tlsConfig, ozoneConf); } @Override @@ -493,7 +489,6 @@ private void processReply(RaftClientReply reply) throws IOException { @Override public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { - super.submitRequest(request, pipelineID); RaftClientReply reply; try (Scope scope = TracingUtil .importAndCreateScope( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index a026f0e8757b..6bce22cc7d21 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -28,6 +28,8 @@ .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -103,8 +105,11 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration Handler.getHandlerForContainerType( containerType, conf, context, containerSet, volumeSet, metrics)); } + + SecurityConfig secConf = new SecurityConfig(conf); this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, - handlers, context, metrics); + handlers, context, metrics, secConf.isBlockTokenEnabled()? + new BlockTokenVerifier(secConf, certClient) : null); /* * ContainerController is the control plane diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 26491492d6d8..e9e8da8880bf 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -102,7 +102,7 @@ public void testContainerCloseActionWhenFull() throws IOException { containerSet, volumeSet, metrics)); } HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics); + conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setScmId(scmId.toString()); ContainerCommandResponseProto responseOne = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); @@ -219,7 +219,7 @@ private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId, } HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics); + conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setScmId(scmId.toString()); return hddsDispatcher; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index a6ba103174e6..cb40f959f43e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -72,7 +72,7 @@ public void setup() throws Exception { containerType, conf, context, containerSet, volumeSet, metrics)); } this.dispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, null, metrics); + conf, containerSet, volumeSet, handlers, null, metrics, null); } @Test diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 20fa092b2d00..497e717b1f70 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -56,7 +56,7 @@ private RatisPipelineUtils() { * @param grpcTlsConfig * @throws IOException */ - static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf, + public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); if (LOG.isDebugEnabled()) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java index 1175229bcaa6..76eee6a1ca3e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -94,7 +94,7 @@ public void testContainerStateMachineIdempotency() throws Exception { ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); + data.length, null); client.sendCommand(writeChunkRequest); //Make the write chunk request again without requesting for overWrite diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index 41ebb634c300..2c1fa1f90f0a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -149,7 +149,7 @@ public void testReleaseBuffers() throws Exception { bufferList.clear(); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper - .getWriteChunkRequest(pipeline, blockID, chunkSize); + .getWriteChunkRequest(pipeline, blockID, chunkSize, null); // add the data to the buffer pool ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put( writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer()); @@ -225,7 +225,7 @@ public void testReleaseBuffersOnException() throws Exception { bufferList.clear(); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper - .getWriteChunkRequest(pipeline, blockID, chunkSize); + .getWriteChunkRequest(pipeline, blockID, chunkSize, null); // add the data to the buffer pool ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put( writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 79900206420e..0fb15d0481cc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -489,7 +489,7 @@ public void testWriteStateMachineDataIdempotencyWithClosedContainer() try { xceiverClient.sendCommand(ContainerTestHelper .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(), - 1024, new Random().nextInt())); + 1024, new Random().nextInt(), null)); latch.countDown(); } catch (IOException e) { latch.countDown(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 30e6d2b36d02..7bad21469c8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import joptsimple.internal.Strings; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -218,14 +219,16 @@ public static void setDataChecksum(ChunkInfo info, ByteBuffer data) * @param pipeline - A set of machines where this container lives. * @param blockID - Block ID of the chunk. * @param datalen - Length of data. + * @param token - block token. * @return ContainerCommandRequestProto * @throws IOException */ public static ContainerCommandRequestProto getWriteChunkRequest( - Pipeline pipeline, BlockID blockID, int datalen) throws IOException { + Pipeline pipeline, BlockID blockID, int datalen, String token) + throws IOException { LOG.trace("writeChunk {} (blockID={}) to pipeline={}", datalen, blockID, pipeline); - return getWriteChunkRequest(pipeline, blockID, datalen, 0); + return getWriteChunkRequest(pipeline, blockID, datalen, 0, token); } /** @@ -234,11 +237,12 @@ public static ContainerCommandRequestProto getWriteChunkRequest( * @param pipeline - A set of machines where this container lives. * @param blockID - Block ID of the chunk. * @param datalen - Length of data. + * @param token - block token. * @return ContainerCommandRequestProto * @throws IOException */ public static ContainerCommandRequestProto getWriteChunkRequest( - Pipeline pipeline, BlockID blockID, int datalen, int seq) + Pipeline pipeline, BlockID blockID, int datalen, int seq, String token) throws IOException { LOG.trace("writeChunk {} (blockID={}) to pipeline={}", datalen, blockID, pipeline); @@ -261,6 +265,9 @@ public static ContainerCommandRequestProto getWriteChunkRequest( request.setContainerID(blockID.getContainerID()); request.setWriteChunk(writeRequest); request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + if (!Strings.isNullOrEmpty(token)) { + request.setEncodedToken(token); + } return request.build(); } @@ -501,8 +508,22 @@ public static ContainerCommandRequestProto getUpdateContainerRequest( public static ContainerCommandRequestProto getPutBlockRequest( Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest) throws IOException { - LOG.trace("putBlock: {} to pipeline={}", - writeRequest.getBlockID(), pipeline); + return getPutBlockRequest(pipeline, null, writeRequest); + } + + /** + * Returns the PutBlockRequest for test purpose. + * @param pipeline - pipeline. + * @param token - token. + * @param writeRequest - Write Chunk Request. + * @return - Request + */ + public static ContainerCommandRequestProto getPutBlockRequest( + Pipeline pipeline, String token, + ContainerProtos.WriteChunkRequestProto writeRequest) + throws IOException { + LOG.trace("putBlock: {} to pipeline={} with token {}", + writeRequest.getBlockID(), pipeline, token); ContainerProtos.PutBlockRequestProto.Builder putRequest = ContainerProtos.PutBlockRequestProto.newBuilder(); @@ -521,6 +542,9 @@ public static ContainerCommandRequestProto getPutBlockRequest( request.setContainerID(blockData.getContainerID()); request.setPutBlock(putRequest); request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + if (!Strings.isNullOrEmpty(token)) { + request.setEncodedToken(token); + } return request.build(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java index 3967c0ccf722..c1f85de4f853 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java @@ -136,7 +136,7 @@ static void runContainerStateMachineMetrics( getTestContainerID()); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest( - pipeline, blockID, 1024); + pipeline, blockID, 1024, null); ContainerCommandResponseProto response = client.sendCommand(writeChunkRequest); Assert.assertEquals(ContainerProtos.Result.SUCCESS, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index 43c354c146e7..1d306533bfde 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -108,7 +108,7 @@ public void testContainerMetrics() throws Exception { containerSet, volumeSet, metrics)); } HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet, - volumeSet, handlers, context, metrics); + volumeSet, handlers, context, metrics, null); dispatcher.setScmId(UUID.randomUUID().toString()); server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null, @@ -129,10 +129,10 @@ public void testContainerMetrics() throws Exception { // Write Chunk BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); ContainerTestHelper.getWriteChunkRequest( - pipeline, blockID, 1024); + pipeline, blockID, 1024, null); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest( - pipeline, blockID, 1024); + pipeline, blockID, 1024, null); response = client.sendCommand(writeChunkRequest); Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 70a88af645a4..67155ac48e27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -564,7 +564,7 @@ public static void createContainerForTesting(XceiverClientSpi client, BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest(client.getPipeline(), - blockID, dataLen); + blockID, dataLen, null); ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(writeChunkRequest); Assert.assertNotNull(response); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 59d741d16add..d71c0e72529b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -228,7 +228,7 @@ public void testClientServerWithContainerDispatcher() throws Exception { containerSet, volumeSet, metrics)); } HddsDispatcher dispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics); + conf, containerSet, volumeSet, handlers, context, metrics, null); dispatcher.setScmId(scmId.toString()); dispatcher.init(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java index cfee1a637883..2253fe294775 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java @@ -19,9 +19,12 @@ package org.apache.hadoop.ozone.container.server; import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -31,9 +34,9 @@ import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -41,18 +44,21 @@ import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.client.CertificateClientTestImpl; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; @@ -61,27 +67,29 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.util.function.CheckedBiConsumer; +import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getCreateContainerRequest; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.*; import static org.apache.ratis.rpc.SupportedRpcType.GRPC; -import static org.apache.ratis.rpc.SupportedRpcType.NETTY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Test Container servers when security is enabled. @@ -107,21 +115,54 @@ static public void setup() throws Exception { caClient = new CertificateClientTestImpl(CONF); } + @After + public void cleanUp() { + FileUtils.deleteQuietly(new File(CONF.get(HDDS_DATANODE_DIR_KEY))); + } + @Test public void testClientServer() throws Exception { - DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + DatanodeDetails dd = TestUtils.randomDatanodeDetails(); ContainerSet containerSet = new ContainerSet(); ContainerController controller = new ContainerController( containerSet, null); + HddsDispatcher hddsDispatcher = createDispatcher(dd, + UUID.randomUUID(), CONF); runTestClientServer(1, (pipeline, conf) -> conf .setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getFirstNode() .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()), XceiverClientGrpc::new, - (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf, - new TestContainerDispatcher(), caClient, - createReplicationService(controller)), (dn, p) -> { - }); + (dn, conf) -> new XceiverServerGrpc(dd, conf, + hddsDispatcher, caClient, + createReplicationService(controller)), (dn, p) -> {}, (p) -> {}); + } + + private static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId, + OzoneConfiguration conf) throws IOException { + ContainerSet containerSet = new ContainerSet(); + conf.set(HDDS_DATANODE_DIR_KEY, + Paths.get(TEST_DIR, "dfs", "data", "hdds", + RandomStringUtils.randomAlphabetic(4)).toString()); + VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf); + DatanodeStateMachine stateMachine = Mockito.mock( + DatanodeStateMachine.class); + StateContext context = Mockito.mock(StateContext.class); + Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd); + Mockito.when(context.getParent()).thenReturn(stateMachine); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType(containerType, conf, context, + containerSet, volumeSet, metrics)); + } + HddsDispatcher hddsDispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, context, metrics, + new BlockTokenVerifier(new SecurityConfig((conf)), caClient)); + hddsDispatcher.setScmId(scmId.toString()); + return hddsDispatcher; } @FunctionalInterface @@ -135,21 +176,14 @@ public void testClientServerRatisGrpc() throws Exception { runTestClientServerRatis(GRPC, 3); } - @Test - @Ignore - public void testClientServerRatisNetty() throws Exception { - runTestClientServerRatis(NETTY, 1); - runTestClientServerRatis(NETTY, 3); - } - static XceiverServerRatis newXceiverServerRatis( DatanodeDetails dn, OzoneConfiguration conf) throws IOException { conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()); final String dir = TEST_DIR + dn.getUuid(); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); - - final ContainerDispatcher dispatcher = new TestContainerDispatcher(); + final ContainerDispatcher dispatcher = createDispatcher(dn, + UUID.randomUUID(), conf); return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher, new ContainerController(new ContainerSet(), Maps.newHashMap()), caClient, null); @@ -161,7 +195,8 @@ static void runTestClientServerRatis(RpcType rpc, int numNodes) (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf), XceiverClientRatis::newXceiverClientRatis, TestSecureContainerServer::newXceiverServerRatis, - (dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p)); + (dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p), + (p) -> {}); } static void runTestClientServer( @@ -171,15 +206,14 @@ static void runTestClientServer( IOException> createClient, CheckedBiFunction createServer, - CheckedBiConsumer initServer) + CheckedBiConsumer initServer, + Consumer stopServer) throws Exception { final List servers = new ArrayList<>(); XceiverClientSpi client = null; - String containerName = OzoneUtils.getRequestID(); + final Pipeline pipeline = + ContainerTestHelper.createPipeline(numDatanodes); try { - final Pipeline pipeline = - ContainerTestHelper.createPipeline(numDatanodes); - initConf.accept(pipeline, CONF); for (DatanodeDetails dn : pipeline.getNodes()) { @@ -188,22 +222,28 @@ static void runTestClientServer( s.start(); initServer.accept(dn, pipeline); } - client = createClient.apply(pipeline, CONF); client.connect(); - // Test 1: Test failure in request without block token. + long testContainerID = getTestContainerID(); + BlockID testBlockID = getTestBlockID(testContainerID); + + //create the container + ContainerProtocolCalls.createContainer(client, testContainerID, null); + + // Test 1: Test putBlock failure without block token. final ContainerCommandRequestProto request = - getCreateContainerRequest( - getTestContainerID(), pipeline); + getPutBlockRequest(pipeline, null, getWriteChunkRequest(pipeline, + testBlockID, 1024, null).getWriteChunk()); Assert.assertNotNull(request.getTraceID()); XceiverClientSpi finalClient = client; - // Validation is different for grpc and ratis client. - if(client instanceof XceiverClientGrpc) { - LambdaTestUtils.intercept(SCMSecurityException.class, "Failed to" + - " authenticate with GRPC XceiverServer with Ozone block token", - () -> finalClient.sendCommand(request)); + if (finalClient instanceof XceiverClientGrpc) { + ContainerCommandResponseProto resp = + finalClient.sendCommand(request); + assertNotEquals(resp.getResult(), ContainerProtos.Result.SUCCESS); + String msg = resp.getMessage(); + assertTrue(msg, msg.contains("Block token verification failed")); } else { IOException e = LambdaTestUtils.intercept(IOException.class, () -> finalClient.sendCommand(request)); @@ -212,80 +252,33 @@ static void runTestClientServer( assertTrue(msg, msg.contains("Block token verification failed")); } - // Test 2: Test success in request with valid block token. + // Test 2: Test putBlock succeeded with valid block token. long expiryTime = Time.monotonicNow() + 60 * 60 * 24; - String omCertSerialId = caClient.getCertificate().getSerialNumber().toString(); OzoneBlockTokenSecretManager secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(CONF), expiryTime, omCertSerialId); secretManager.start(caClient); - Token token = secretManager.generateToken("1", + Token token = secretManager.generateToken( + testBlockID.getContainerBlockID().toString(), EnumSet.allOf(AccessModeProto.class), RandomUtils.nextLong()); + final ContainerCommandRequestProto request2 = - ContainerTestHelper - .getCreateContainerSecureRequest( - getTestContainerID(), pipeline, - token); + getPutBlockRequest(pipeline, token.encodeToUrlString(), + getWriteChunkRequest(pipeline, testBlockID, 1024, + token.encodeToUrlString()).getWriteChunk()); Assert.assertNotNull(request2.getTraceID()); - XceiverClientSpi finalClient2 = createClient.apply(pipeline, CONF); - if(finalClient2 instanceof XceiverClientGrpc) { - finalClient2.connect(token.encodeToUrlString()); - } else { - finalClient2.connect(); - } - - ContainerCommandRequestProto request3 = getCreateContainerRequest( - getTestContainerID(), pipeline, token); - ContainerCommandResponseProto resp = finalClient2.sendCommand(request3); - assertEquals(SUCCESS, resp.getResult()); + ContainerCommandResponseProto resp2 = finalClient.sendCommand(request2); + assertEquals(SUCCESS, resp2.getResult()); } finally { if (client != null) { client.close(); } + if (pipeline != null) { + stopServer.accept(pipeline); + } servers.stream().forEach(XceiverServerSpi::stop); } } - - private static class TestContainerDispatcher implements ContainerDispatcher { - /** - * Dispatches commands to container layer. - * - * @param msg - Command Request - * @return Command Response - */ - @Override - public ContainerCommandResponseProto dispatch( - ContainerCommandRequestProto msg, - DispatcherContext context) { - return ContainerTestHelper.getCreateContainerResponse(msg); - } - - @Override - public void init() { - } - - @Override - public void validateContainerCommand( - ContainerCommandRequestProto msg) throws StorageContainerException { - } - - @Override - public void shutdown() { - } - @Override - public Handler getHandler(ContainerProtos.ContainerType containerType) { - return null; - } - - @Override - public void setScmId(String scmId) { - } - - @Override - public void buildMissingContainerSetAndValidate( - Map container2BCSIDMap) { - } - } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index db3e7bdecc65..b19020f96371 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -98,7 +98,7 @@ public void tesGetCommittedBlockLength() throws Exception { ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); + data.length, null); client.sendCommand(writeChunkRequest); // Now, explicitly make a putKey request for the block. ContainerProtos.ContainerCommandRequestProto putKeyRequest = @@ -156,7 +156,7 @@ public void tesPutKeyResposne() throws Exception { ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); + data.length, null); client.sendCommand(writeChunkRequest); // Now, explicitly make a putKey request for the block. ContainerProtos.ContainerCommandRequestProto putKeyRequest = diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java index ea2d46a66ab3..d38e8058f2dc 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java @@ -20,11 +20,15 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto; +import org.apache.hadoop.hdds.security.token.BlockTokenException; +import org.apache.hadoop.hdds.security.token.BlockTokenVerifier; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient; +import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; @@ -57,12 +61,14 @@ public class TestOzoneBlockTokenSecretManager { private CertificateClient client; private static final String BASEDIR = GenericTestUtils .getTempPath(TestOzoneBlockTokenSecretManager.class.getSimpleName()); + private BlockTokenVerifier tokenVerifier; @Before public void setUp() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR); + conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true); // Create Ozone Master key pair. keyPair = KeyStoreTestUtil.generateKeyPair("RSA"); expiryTime = Time.monotonicNow() + 60 * 60 * 24; @@ -76,6 +82,8 @@ public void setUp() throws Exception { client = getCertificateClient(securityConfig); client.init(); secretManager.start(client); + tokenVerifier = new BlockTokenVerifier(securityConfig, client); + } private CertificateClient getCertificateClient(SecurityConfig secConf) @@ -86,6 +94,12 @@ public X509Certificate getCertificate() { return x509Certificate; } + @Override + public X509Certificate getCertificate(String certSerialId) + throws CertificateException { + return x509Certificate; + } + @Override public PrivateKey getPrivateKey() { return keyPair.getPrivate(); @@ -183,4 +197,38 @@ public void testVerifySignatureFailure() throws Exception { () -> secretManager.verifySignature(id, client.signData(id.getBytes()))); } + + @Test + public void testBlockTokenVerifier() throws Exception { + String tokenBlockID = "101"; + Token token = + secretManager.generateToken("testUser", tokenBlockID, + EnumSet.allOf(AccessModeProto.class), 100); + OzoneBlockTokenIdentifier btIdentifier = + OzoneBlockTokenIdentifier.readFieldsProtobuf(new DataInputStream( + new ByteArrayInputStream(token.getIdentifier()))); + + // Check basic details. + Assert.assertTrue(btIdentifier.getOwnerId().equals("testUser")); + Assert.assertTrue(btIdentifier.getBlockId().equals("101")); + Assert.assertTrue(btIdentifier.getAccessModes().equals(EnumSet + .allOf(AccessModeProto.class))); + Assert.assertTrue(btIdentifier.getOmCertSerialId().equals(omCertSerialId)); + + validateHash(token.getPassword(), btIdentifier.getBytes()); + + tokenVerifier.verify("testUser", token.encodeToUrlString(), + ContainerProtos.Type.PutBlock, "101"); + + String notAllledBlockID = "NotAllowedBlockID"; + LambdaTestUtils.intercept(BlockTokenException.class, + "Token for block ID: " + tokenBlockID + + " can't be used to access block: " + notAllledBlockID, + () -> tokenVerifier.verify("testUser", token.encodeToUrlString(), + ContainerProtos.Type.PutBlock, notAllledBlockID)); + + // Non block operations are not checked by block token verifier + tokenVerifier.verify(null, null, + ContainerProtos.Type.CloseContainer, null); + } } \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index c05ecb966bcd..28da1346a5e0 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -116,7 +116,7 @@ public void initialize() throws IOException { containerType, conf, context, containerSet, volumeSet, metrics)); } dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, handlers, - context, metrics); + context, metrics, null); dispatcher.init(); containerCount = new AtomicInteger();