Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,15 +54,31 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{

private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
client;
private final long timeout;

public InterSCMGrpcClient(final String host, final int leaderPort,
final ConfigurationSource conf) {
final int port = leaderPort;
final long timeout =
public InterSCMGrpcClient(final String host,
int port, final ConfigurationSource conf,
SCMCertificateClient scmCertificateClient) throws IOException {
Preconditions.checkNotNull(conf);
timeout =
conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
SecurityConfig securityConfig = new SecurityConfig(conf);
if (securityConfig.isSecurityEnabled()
&& securityConfig.isGrpcTlsEnabled()) {
SslContextBuilder sslClientContextBuilder = SslContextBuilder.forClient();
sslClientContextBuilder.keyManager(scmCertificateClient.getPrivateKey(),
scmCertificateClient.getCertificate());
sslClientContextBuilder.trustManager(
scmCertificateClient.getCACertificate());
SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
sslClientContextBuilder, securityConfig.getGrpcSslProvider());
channelBuilder.sslContext(sslContextBuilder.build())
.useTransportSecurity();
}

channel = channelBuilder.build();
client = InterSCMProtocolServiceGrpc.newStub(channel).
withDeadlineAfter(timeout, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,6 +59,25 @@ public InterSCMGrpcProtocolService(final ConfigurationSource conf,

InterSCMGrpcService service = new InterSCMGrpcService(scm);
ServerBuilder b = nettyServerBuilder.addService(service);

SecurityConfig securityConfig = new SecurityConfig(conf);
if (securityConfig.isSecurityEnabled()
&& securityConfig.isGrpcTlsEnabled()) {
try {
SslContextBuilder sslServerContextBuilder =
SslContextBuilder.forServer(
scm.getScmCertificateClient().getPrivateKey(),
scm.getScmCertificateClient().getCertificate());
SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
sslServerContextBuilder, securityConfig.getGrpcSslProvider());
nettyServerBuilder.sslContext(sslContextBuilder.build());
} catch (Exception ex) {
LOG.error("Unable to setup TLS for secure " +
"InterSCMGrpcProtocolService GRPC endpoint.", ex);
throw new RuntimeException("Unable to setup TLS for secure " +
"InterSCMGrpcProtocolService GRPC endpoint.");
}
}
Preconditions.checkNotNull(b);
server = nettyServerBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public SCMHAManagerImpl(final ConfigurationSource conf,
this.ratisServer = new SCMRatisServerImpl(conf, scm,
(SCMHADBTransactionBuffer) transactionBuffer);
this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
scm.getSCMHANodeDetails().getPeerNodeDetails());
scm.getSCMHANodeDetails().getPeerNodeDetails(),
scm.getScmCertificateClient());
grpcServer = new InterSCMGrpcProtocolService(conf, scm);
} else {
this.transactionBuffer = new SCMDBTransactionBufferImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;

Expand All @@ -57,10 +58,14 @@ public class SCMSnapshotProvider {

private Map<String, SCMNodeDetails> peerNodesMap;

private final SCMCertificateClient scmCertificateClient;

public SCMSnapshotProvider(ConfigurationSource conf,
List<SCMNodeDetails> peerNodes) {
List<SCMNodeDetails> peerNodes,
SCMCertificateClient scmCertificateClient) {
LOG.info("Initializing SCM Snapshot Provider");
this.conf = conf;
this.scmCertificateClient = scmCertificateClient;
// Create Ratis storage dir
String scmRatisDirectory = SCMHAUtils.getSCMRatisDirectory(conf);

Expand Down Expand Up @@ -101,12 +106,13 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID)
.getAbsolutePath();
File targetFile = new File(snapshotFilePath + ".tar.gz");


// the downloadClient instance will be created as and when install snapshot
// request is received. No caching of the client as it should be a very rare
int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort();
SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient(
peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
port, conf);
port, conf, scmCertificateClient);
try {
downloadClient.download(targetFile.toPath()).get();
} catch (ExecutionException | InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ private StorageContainerManager(OzoneConfiguration conf,
}

private void initializeCertificateClient() {
if (scmStorageConfig.checkPrimarySCMIdInitialized()) {
if (OzoneSecurityUtil.isSecurityEnabled(configuration) &&
scmStorageConfig.checkPrimarySCMIdInitialized()) {
scmCertificateClient = new SCMCertificateClient(
new SecurityConfig(configuration),
scmStorageConfig.getScmCertSerialId());
Expand Down