Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds;

import javax.management.ObjectName;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
Expand All @@ -36,7 +37,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
Expand All @@ -48,6 +58,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED_DEFAULT;

import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -161,6 +173,27 @@ public static InetSocketAddress getScmAddressForBlockClients(
.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}

/**
* Create a scm security client.
*
* @return {@link ScmBlockLocationProtocol}
* @throws IOException
*/
public static SCMSecurityProtocol getScmSecurityClient(
OzoneConfiguration conf, InetSocketAddress address) throws IOException {
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
long scmVersion =
RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
new SCMSecurityProtocolClientSideTranslatorPB(
RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
address, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
return scmSecurityClient;
}

/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,43 @@
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.UUID;

import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.CLUSTER_ID;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
import static org.apache.hadoop.util.ExitUtil.terminate;

/**
Expand All @@ -67,6 +82,8 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
private DatanodeDetails datanodeDetails;
private DatanodeStateMachine datanodeStateMachine;
private List<ServicePlugin> plugins;
private CertificateClient dnCertClient;
private String component;

/**
* Default constructor.
Expand Down Expand Up @@ -133,6 +150,10 @@ public static void main(String[] args) {
}
}

public static Logger getLogger() {
return LOG;
}

/**
* Starts HddsDatanode services.
*
Expand All @@ -158,13 +179,15 @@ public void start(Object service) {
.substring(0, 8));
LOG.info("HddsDatanodeService host:{} ip:{}", hostname, ip);
// Authenticate Hdds Datanode service if security is enabled
if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY,
true)) {
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
component = "dn-" + datanodeDetails.getUuidString();

dnCertClient = new DNCertificateClient(new SecurityConfig(conf));

if (SecurityUtil.getAuthenticationMethod(conf).equals(
UserGroupInformation.AuthenticationMethod.KERBEROS)) {
LOG.debug("Ozone security is enabled. Attempting login for Hdds " +
"Datanode user. "
+ "Principal: {},keytab: {}", conf.get(
LOG.info("Ozone security is enabled. Attempting login for Hdds " +
"Datanode user. Principal: {},keytab: {}", conf.get(
DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY),
conf.get(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY));

Expand All @@ -183,6 +206,9 @@ public void start(Object service) {
startPlugins();
// Starting HDDS Daemons
datanodeStateMachine.startDaemon();
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
initializeCertificateClient(conf);
}
} catch (IOException e) {
throw new RuntimeException("Can't start the HDDS datanode plugin", e);
} catch (AuthenticationException ex) {
Expand All @@ -192,6 +218,92 @@ public void start(Object service) {
}
}

/**
* Initializes secure Datanode.
* */
@VisibleForTesting
public void initializeCertificateClient(OzoneConfiguration config)
throws IOException {
LOG.info("Initializing secure Datanode.");

CertificateClient.InitResponse response = dnCertClient.init();
LOG.info("Init response: {}", response);
switch (response) {
case SUCCESS:
LOG.info("Initialization successful.");
break;
case GETCERT:
getSCMSignedCert(dnCertClient, config);
LOG.info("Successfully stored SCM signed certificate.");
break;
case FAILURE:
LOG.error("DN security initialization failed.");
throw new RuntimeException("DN security initialization failed.");
case RECOVER:
LOG.error("DN security initialization failed. OM certificate is " +
"missing.");
throw new RuntimeException("DN security initialization failed.");
default:
LOG.error("DN security initialization failed. Init response: {}",
response);
throw new RuntimeException("DN security initialization failed.");
}
}

/**
* Get SCM signed certificate and store it using certificate client.
* */
private void getSCMSignedCert(CertificateClient client,
OzoneConfiguration config) throws IOException {

for (EndpointStateMachine ep : datanodeStateMachine.getConnectionManager()
.getValues()) {
PKCS10CertificationRequest csr = getCSR(client, config, ep.getVersion());
SCMSecurityProtocol secureScmClient =
HddsUtils.getScmSecurityClient(config, ep.getAddress());

String pemEncodedCert = secureScmClient.getDataNodeCertificate(
datanodeDetails.getProtoBufMessage(), getEncodedString(csr));

try {
X509Certificate x509Certificate =
CertificateCodec.getX509Certificate(pemEncodedCert);
client.storeCertificate(x509Certificate);
} catch (IOException | CertificateException e) {
LOG.error("Error while storing SCM signed certificate.", e);
throw new RuntimeException(e);
}
}


}

/**
* Creates CSR for DN.
* */
@VisibleForTesting
public PKCS10CertificationRequest getCSR(CertificateClient client,
Configuration config, VersionResponse version) throws IOException {
CertificateSignRequest.Builder builder = client.getCSRBuilder();
KeyPair keyPair = new KeyPair(client.getPublicKey(),
client.getPrivateKey());

String hostname = InetAddress.getLocalHost().getCanonicalHostName();
String subject = UserGroupInformation.getCurrentUser()
.getShortUserName() + "@" + hostname;

builder.setCA(false)
.setKey(keyPair)
.setConfiguration(config)
.setScmID(version.getValue(SCM_ID))
.setClusterID(CLUSTER_ID)
.setSubject(subject);

LOG.info("Creating csr for DN-> subject:{},scmId:{},clusterId:{},",
subject, version.getValue(SCM_ID), version.getValue(CLUSTER_ID));
return builder.build();
}

/**
* Returns DatanodeDetails or null in case of Error.
*
Expand Down Expand Up @@ -308,4 +420,18 @@ public void close() {
}
}
}

@VisibleForTesting
public String getComponent() {
return component;
}

public CertificateClient getCertificateClient() {
return dnCertClient;
}

@VisibleForTesting
public void setCertificateClient(CertificateClient client) {
dnCertClient = client;
}
}
Loading