Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public final class HddsConfigKeys {
"hdds.x509.ca.rotation.ack.timeout";
public static final String HDDS_X509_CA_ROTATION_ACK_TIMEOUT_DEFAULT =
"PT15M";
public static final String HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL =
"hdds.x509.rootca.certificate.polling.interval";
public static final String
HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT = "PT2h";

public static final String HDDS_CONTAINER_REPLICATION_COMPRESSION =
"hdds.container.replication.compression";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_TIME_OF_DAY_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_FILE;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_FILE_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PRIVATE_KEY_FILE;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PRIVATE_KEY_FILE_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PUBLIC_KEY_FILE;
Expand Down Expand Up @@ -131,6 +133,7 @@ public class SecurityConfig {
Pattern.compile("\\d{2}:\\d{2}:\\d{2}");
private final Duration caAckTimeout;
private final SslProvider grpcSSLProvider;
private final Duration rootCaCertificatePollingInterval;

/**
* Constructs a SecurityConfig.
Expand Down Expand Up @@ -228,6 +231,13 @@ public SecurityConfig(ConfigurationSource configuration) {

validateCertificateValidityConfig();

String rootCaCertificatePollingIntervalString = configuration.get(
HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL,
HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT);

this.rootCaCertificatePollingInterval =
Duration.parse(rootCaCertificatePollingIntervalString);

this.externalRootCaCert = configuration.get(
HDDS_X509_ROOTCA_CERTIFICATE_FILE,
HDDS_X509_ROOTCA_CERTIFICATE_FILE_DEFAULT);
Expand Down Expand Up @@ -552,6 +562,10 @@ public Duration getCaAckTimeout() {
return caAckTimeout;
}

public Duration getRootCaCertificatePollingInterval() {
return rootCaCertificatePollingInterval;
}

/**
* Return true if using test certificates with authority as localhost. This
* should be used only for unit test where certificates are generated by
Expand Down
11 changes: 11 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,17 @@
is failed. Default is 15 minutes.
</description>
</property>
<property>
<name>hdds.x509.rootca.certificate.polling.interval</name>
<value>PT2h</value>
<description>Interval to use for polling in certificate clients for a new
root ca certificate. Every time the specified time duration elapses,
the clients send a request to the SCMs to see if a new root ca
certificate was generated. Once there is a change, the system
automatically adds the new root ca to the clients'
trust stores and requests a new certificate to be signed.
</description>
</property>
<property>
<name>ozone.scm.security.handler.count.key</name>
<value>2</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.security.x509.certificate.client;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Poller mechanism for Root Ca Rotation for clients.
*/
public class RootCaRotationPoller implements Runnable, Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(RootCaRotationPoller.class);
private final List<Function<List<X509Certificate>, CompletableFuture<Void>>>
rootCARotationProcessors;
private final ScheduledExecutorService poller;
private final Duration pollingInterval;
private Set<X509Certificate> knownRootCerts;
private final SCMSecurityProtocolClientSideTranslatorPB scmSecureClient;

public RootCaRotationPoller(SecurityConfig securityConfig,
Set<X509Certificate> initiallyKnownRootCaCerts,
SCMSecurityProtocolClientSideTranslatorPB scmSecureClient) {
this.scmSecureClient = scmSecureClient;
this.knownRootCerts = initiallyKnownRootCaCerts;
poller = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(
this.getClass().getSimpleName())
.setDaemon(true).build());
pollingInterval = securityConfig.getRootCaCertificatePollingInterval();
rootCARotationProcessors = new ArrayList<>();
}

private void pollRootCas() {
try {
List<String> pemEncodedRootCaList =
scmSecureClient.getAllRootCaCertificates();
List<X509Certificate> rootCAsFromSCM =
OzoneSecurityUtil.convertToX509(pemEncodedRootCaList);
List<X509Certificate> scmCertsWithoutKnownCerts
= new ArrayList<>(rootCAsFromSCM);
scmCertsWithoutKnownCerts.removeAll(knownRootCerts);
if (scmCertsWithoutKnownCerts.isEmpty()) {
return;
}
LOG.info("Some root CAs are not known to the client out of the root " +
"CAs known to the SCMs. Root CA Cert ids known to the client: " +
getPrintableCertIds(knownRootCerts) + ". Root CA Cert ids from " +
"SCM not known by the client: " +
getPrintableCertIds(scmCertsWithoutKnownCerts));

CompletableFuture<Void> allRootCAProcessorFutures =
CompletableFuture.allOf(rootCARotationProcessors.stream()
.map(c -> c.apply(rootCAsFromSCM))
.toArray(CompletableFuture[]::new));

allRootCAProcessorFutures.whenComplete((unused, throwable) -> {
if (throwable == null) {
knownRootCerts = new HashSet<>(rootCAsFromSCM);
}
});
} catch (IOException e) {
LOG.error("Error while trying to poll root ca certificate", e);
}
}

public void addRootCARotationProcessor(
Function<List<X509Certificate>, CompletableFuture<Void>> processor) {
rootCARotationProcessors.add(processor);
}

@Override
public void run() {
poller.scheduleAtFixedRate(this::pollRootCas, 0,
pollingInterval.getSeconds(), TimeUnit.SECONDS);
}

@Override
public void close() {
executorServiceShutdownGraceful(poller);
}

private void executorServiceShutdownGraceful(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("{} couldn't be shut down gracefully",
getClass().getSimpleName());
}
} catch (InterruptedException e) {
LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
Thread.currentThread().interrupt();
}
}

private String getPrintableCertIds(Collection<X509Certificate> certs) {
return certs.stream()
.map(X509Certificate::getSerialNumber)
.map(BigInteger::toString)
.collect(Collectors.joining(", "));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.security.x509.certificate.utils;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.RootCaRotationPoller;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.ozone.test.GenericTestUtils;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL;

/**
* Test for Root Ca Rotation polling mechanism on client side.
*/
public class TestRootCaRotationPoller {

private SecurityConfig secConf;

@Mock
private SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient;

@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL, "PT1s");
secConf = new SecurityConfig(conf);
}

@Test
public void testPollerDoesNotInvokeRootCaProcessor() throws Exception {
X509Certificate knownCert = generateX509Cert(
LocalDateTime.now(), Duration.ofSeconds(50));
HashSet<X509Certificate> knownCerts = new HashSet<>();
knownCerts.add(knownCert);
List<String> certsFromScm = new ArrayList<>();
certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
knownCerts, scmSecurityClient);

Mockito.when(scmSecurityClient.getAllRootCaCertificates())
.thenReturn(certsFromScm);
AtomicBoolean atomicBoolean = new AtomicBoolean();
atomicBoolean.set(false);
poller.addRootCARotationProcessor(
certificates -> CompletableFuture.supplyAsync(() -> {
atomicBoolean.set(true);
Assertions.assertEquals(certificates.size(), 2);
return null;
}));
poller.run();
Assertions.assertThrows(TimeoutException.class, () ->
GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000));
}

@Test
public void testPollerInvokesRootCaProcessors() throws Exception {
X509Certificate knownCert = generateX509Cert(
LocalDateTime.now(), Duration.ofSeconds(50));
X509Certificate newRootCa = generateX509Cert(
LocalDateTime.now(), Duration.ofSeconds(50));
HashSet<X509Certificate> knownCerts = new HashSet<>();
knownCerts.add(knownCert);
List<String> certsFromScm = new ArrayList<>();
certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
certsFromScm.add(CertificateCodec.getPEMEncodedString(newRootCa));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have small test by just adding knowCert to ensure both cases are fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you meant by this, but I have refined the first test case to use the knownCert instead of not using certs at all. I hope this fixes it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I meant the same. Thanks for updating it.

RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
knownCerts, scmSecurityClient);
poller.run();
Mockito.when(scmSecurityClient.getAllRootCaCertificates())
.thenReturn(certsFromScm);
AtomicBoolean atomicBoolean = new AtomicBoolean();
atomicBoolean.set(false);
poller.addRootCARotationProcessor(
certificates -> CompletableFuture.supplyAsync(() -> {
atomicBoolean.set(true);
Assertions.assertEquals(certificates.size(), 2);
return null;
}));
GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000);
}

private X509Certificate generateX509Cert(
LocalDateTime startDate, Duration certLifetime) throws Exception {
KeyPair keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
LocalDateTime start = startDate == null ? LocalDateTime.now() : startDate;
LocalDateTime end = start.plus(certLifetime);
return new JcaX509CertificateConverter().getCertificate(
SelfSignedCertificate.newBuilder().setBeginDate(start)
.setEndDate(end).setClusterID("cluster").setKey(keyPair)
.setSubject("localhost").setConfiguration(secConf).setScmID("test")
.build());
}
}