Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public interface CertificateStore {

/**
* Writes a new certificate that was issued to the persistent store.
*
* Note: Don't rename this method, as it is used in
* SCMHAInvocationHandler#invokeRatis. If for any case renaming this
* method name is required, change it over there.
*
* @param serialID - Certificate Serial Number.
* @param certificate - Certificate to persist.
* @param role - OM/DN/SCM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,18 @@
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.util.TimeDuration;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
import org.slf4j.Logger;
Expand All @@ -46,7 +55,9 @@
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
Expand Down Expand Up @@ -286,24 +297,81 @@ private static void persistSubCACertificate(OzoneConfiguration config,

/**
* Create Server TLS parameters required for Ratis Server.
* @param conf
* @param caClient
* @return

* @return Parameter map set with TLS config.
*/
public static Parameters createSCMServerTlsParameters(SecurityConfig conf,
CertificateClient caClient) {
public static Parameters createSCMServerTlsParameters(
GrpcTlsConfig grpcTlsConfig) {
Parameters parameters = new Parameters();

if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
GrpcTlsConfig config = new GrpcTlsConfig(
caClient.getPrivateKey(), caClient.getCertificate(),
caClient.getCACertificate(), true);
GrpcConfigKeys.Server.setTlsConf(parameters, config);
GrpcConfigKeys.Admin.setTlsConf(parameters, config);
GrpcConfigKeys.Client.setTlsConf(parameters, config);
GrpcConfigKeys.TLS.setConf(parameters, config);
if (grpcTlsConfig != null) {
GrpcConfigKeys.Server.setTlsConf(parameters, grpcTlsConfig);
GrpcConfigKeys.Admin.setTlsConf(parameters, grpcTlsConfig);
GrpcConfigKeys.Client.setTlsConf(parameters, grpcTlsConfig);
GrpcConfigKeys.TLS.setConf(parameters, grpcTlsConfig);
}

return parameters;
}

/**
* Create GrpcTlsConfig.
* @param conf
* @param certificateClient
* @return
*/
public static GrpcTlsConfig createSCMRatisTLSConfig(SecurityConfig conf,
CertificateClient certificateClient) {
if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
return new GrpcTlsConfig(
certificateClient.getPrivateKey(), certificateClient.getCertificate(),
certificateClient.getCACertificate(), true);
}
return null;
}

/**
* Submit SCM certs request to ratis using RaftClient.
* @param raftGroup
* @param tlsConfig
* @param message
* @return SCMRatisResponse.
* @throws Exception
*/
public static SCMRatisResponse submitScmCertsToRatis(RaftGroup raftGroup,
GrpcTlsConfig tlsConfig, Message message) throws Exception {
final RaftProperties properties = new RaftProperties();

// TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
RaftConfigKeys.Rpc.setType(properties, RpcType.valueOf("GRPC"));


// For now not making anything configurable, RaftClient is only used
// in SCM for DB updates of sub-ca certs go via Ratis.
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(raftGroup)
.setLeaderId(null)
.setProperties(properties)
.setRetryPolicy(
RetryPolicies.retryUpToMaximumCountWithFixedSleep(15,
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS)));

if (tlsConfig != null) {
Parameters parameters = new Parameters();
GrpcConfigKeys.Client.setTlsConf(parameters, tlsConfig);
builder.setParameters(parameters);
}

RaftClient raftClient = builder.build();

CompletableFuture<RaftClientReply> future =
raftClient.async().send(message);

RaftClientReply raftClientReply = future.get();

return SCMRatisResponse.decode(raftClientReply);

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
Expand Down Expand Up @@ -230,5 +231,10 @@ public SCMStateMachine getSCMStateMachine() {
public boolean addSCM(AddSCMRequest request) throws IOException {
return false;
}

@Override
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Method;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -89,11 +90,29 @@ private Object invokeRatis(Method method, Object[] args)
throws Exception {
long startTime = Time.monotonicNowNanos();
Preconditions.checkNotNull(ratisHandler);
final SCMRatisResponse response = ratisHandler.submitRequest(
SCMRatisRequest.of(requestType, method.getName(),
method.getParameterTypes(), args));
SCMRatisRequest scmRatisRequest = SCMRatisRequest.of(requestType,
method.getName(), method.getParameterTypes(), args);

// Scm Cert DB updates should use RaftClient.
// As rootCA which is primary SCM only can issues certificates to sub-CA.
// In case primary is not leader SCM, still sub-ca cert DB updates should go
// via ratis. So, in this special scenario we use RaftClient.
final SCMRatisResponse response;
if (method.getName().equals("storeValidCertificate") &&
args[args.length -1].equals(HddsProtos.NodeType.SCM)) {
response =
HASecurityUtils.submitScmCertsToRatis(
ratisHandler.getDivision().getGroup(),
ratisHandler.getGrpcTlsConfig(),
scmRatisRequest.encode());

} else {
response = ratisHandler.submitRequest(
scmRatisRequest);
}
LOG.info("Invoking method {} on target {}, cost {}us",
method, ratisHandler, (Time.monotonicNowNanos() - startTime) / 1000.0);

if (response.isSuccess()) {
return response.getResult();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;

Expand Down Expand Up @@ -57,4 +58,7 @@ SCMRatisResponse submitRequest(SCMRatisRequest request)
boolean addSCM(AddSCMRequest request) throws IOException;

SCMStateMachine getSCMStateMachine();

GrpcTlsConfig getGrpcTlsConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
Expand All @@ -58,6 +59,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.scm.ha.HASecurityUtils.createSCMRatisTLSConfig;
import static org.apache.hadoop.hdds.scm.ha.HASecurityUtils.createSCMServerTlsParameters;

/**
* TODO.
*/
Expand All @@ -71,6 +75,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
private final ClientId clientId = ClientId.randomId();
private final AtomicLong callId = new AtomicLong();
private final RaftServer.Division division;
private final GrpcTlsConfig grpcTlsConfig;

// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
Expand All @@ -91,9 +96,10 @@ public class SCMRatisServerImpl implements SCMRatisServer {
// scm boots up, it has peer info embedded in the raft log and will
// trigger leader election.

Parameters parameters =
HASecurityUtils.createSCMServerTlsParameters(new SecurityConfig(conf),
scm.getScmCertificateClient());
grpcTlsConfig = createSCMRatisTLSConfig(new SecurityConfig(conf),
scm.getScmCertificateClient());
Parameters parameters = createSCMServerTlsParameters(grpcTlsConfig);

this.server = newRaftServer(scm.getScmId(), conf)
.setStateMachine(stateMachine)
.setGroup(RaftGroup.valueOf(groupId))
Expand All @@ -116,6 +122,11 @@ public static void initialize(String clusterId, String scmId,
}
}

@Override
public GrpcTlsConfig getGrpcTlsConfig() {
return grpcTlsConfig;
}

public static void reinitialize(String clusterId, String scmId,
SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
RaftServer server = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl,
@Override
public SCMSecurityResponse submitRequest(RpcController controller,
SCMSecurityRequest request) throws ServiceException {
if (!scm.checkLeader()) {
throw new ServiceException(scm.getScmHAManager()
.getRatisServer()
.triggerNotLeaderException());
// For request type GetSCMCertificate we don't need leader check. As
// primary SCM may not be leader SCM.
if (!request.getCmdType().equals(GetSCMCertificate)) {
if (!scm.checkLeader()) {
throw new ServiceException(scm.getScmHAManager()
.getRatisServer()
.triggerNotLeaderException());
}
}
return dispatcher.processRequest(request, this::processRequest,
request.getCmdType(), request.getTraceID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerStateManagerV2;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.junit.Assert;
Expand Down Expand Up @@ -90,6 +91,11 @@ public boolean addSCM(AddSCMRequest request)
public SCMStateMachine getSCMStateMachine() {
return null;
}

@Override
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}
};
}

Expand Down Expand Up @@ -125,7 +131,7 @@ public void testReplicateAnnotationBasic() throws Throwable {
try {
certificateStore.storeValidCertificate(BigInteger.valueOf(100L),
KeyStoreTestUtil.generateCertificate("CN=Test", keyPair, 30,
"SHA256withRSA"), HddsProtos.NodeType.SCM);
"SHA256withRSA"), HddsProtos.NodeType.DATANODE);
Assert.fail("Cannot reach here: should have seen a IOException");
} catch (IOException ignore) {
Assert.assertNotNull(ignore.getMessage() != null);
Expand Down