ozone.scm.security.handler.count.key
2
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java
new file mode 100644
index 000000000000..47cc368bbefe
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.x509.certificate.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Poller mechanism for Root Ca Rotation for clients.
+ */
+public class RootCaRotationPoller implements Runnable, Closeable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RootCaRotationPoller.class);
+ private final List, CompletableFuture>>
+ rootCARotationProcessors;
+ private final ScheduledExecutorService poller;
+ private final Duration pollingInterval;
+ private Set knownRootCerts;
+ private final SCMSecurityProtocolClientSideTranslatorPB scmSecureClient;
+
+ public RootCaRotationPoller(SecurityConfig securityConfig,
+ Set initiallyKnownRootCaCerts,
+ SCMSecurityProtocolClientSideTranslatorPB scmSecureClient) {
+ this.scmSecureClient = scmSecureClient;
+ this.knownRootCerts = initiallyKnownRootCaCerts;
+ poller = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat(
+ this.getClass().getSimpleName())
+ .setDaemon(true).build());
+ pollingInterval = securityConfig.getRootCaCertificatePollingInterval();
+ rootCARotationProcessors = new ArrayList<>();
+ }
+
+ private void pollRootCas() {
+ try {
+ List pemEncodedRootCaList =
+ scmSecureClient.getAllRootCaCertificates();
+ List rootCAsFromSCM =
+ OzoneSecurityUtil.convertToX509(pemEncodedRootCaList);
+ List scmCertsWithoutKnownCerts
+ = new ArrayList<>(rootCAsFromSCM);
+ scmCertsWithoutKnownCerts.removeAll(knownRootCerts);
+ if (scmCertsWithoutKnownCerts.isEmpty()) {
+ return;
+ }
+ LOG.info("Some root CAs are not known to the client out of the root " +
+ "CAs known to the SCMs. Root CA Cert ids known to the client: " +
+ getPrintableCertIds(knownRootCerts) + ". Root CA Cert ids from " +
+ "SCM not known by the client: " +
+ getPrintableCertIds(scmCertsWithoutKnownCerts));
+
+ CompletableFuture allRootCAProcessorFutures =
+ CompletableFuture.allOf(rootCARotationProcessors.stream()
+ .map(c -> c.apply(rootCAsFromSCM))
+ .toArray(CompletableFuture[]::new));
+
+ allRootCAProcessorFutures.whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ knownRootCerts = new HashSet<>(rootCAsFromSCM);
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("Error while trying to poll root ca certificate", e);
+ }
+ }
+
+ public void addRootCARotationProcessor(
+ Function, CompletableFuture> processor) {
+ rootCARotationProcessors.add(processor);
+ }
+
+ @Override
+ public void run() {
+ poller.scheduleAtFixedRate(this::pollRootCas, 0,
+ pollingInterval.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void close() {
+ executorServiceShutdownGraceful(poller);
+ }
+
+ private void executorServiceShutdownGraceful(ExecutorService executor) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn("{} couldn't be shut down gracefully",
+ getClass().getSimpleName());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private String getPrintableCertIds(Collection certs) {
+ return certs.stream()
+ .map(X509Certificate::getSerialNumber)
+ .map(BigInteger::toString)
+ .collect(Collectors.joining(", "));
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java
new file mode 100644
index 000000000000..7f2ed0a8135c
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.security.x509.certificate.utils;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.RootCaRotationPoller;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.ozone.test.GenericTestUtils;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL;
+
+/**
+ * Test for Root Ca Rotation polling mechanism on client side.
+ */
+public class TestRootCaRotationPoller {
+
+ private SecurityConfig secConf;
+
+ @Mock
+ private SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient;
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL, "PT1s");
+ secConf = new SecurityConfig(conf);
+ }
+
+ @Test
+ public void testPollerDoesNotInvokeRootCaProcessor() throws Exception {
+ X509Certificate knownCert = generateX509Cert(
+ LocalDateTime.now(), Duration.ofSeconds(50));
+ HashSet knownCerts = new HashSet<>();
+ knownCerts.add(knownCert);
+ List certsFromScm = new ArrayList<>();
+ certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
+ RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
+ knownCerts, scmSecurityClient);
+
+ Mockito.when(scmSecurityClient.getAllRootCaCertificates())
+ .thenReturn(certsFromScm);
+ AtomicBoolean atomicBoolean = new AtomicBoolean();
+ atomicBoolean.set(false);
+ poller.addRootCARotationProcessor(
+ certificates -> CompletableFuture.supplyAsync(() -> {
+ atomicBoolean.set(true);
+ Assertions.assertEquals(certificates.size(), 2);
+ return null;
+ }));
+ poller.run();
+ Assertions.assertThrows(TimeoutException.class, () ->
+ GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000));
+ }
+
+ @Test
+ public void testPollerInvokesRootCaProcessors() throws Exception {
+ X509Certificate knownCert = generateX509Cert(
+ LocalDateTime.now(), Duration.ofSeconds(50));
+ X509Certificate newRootCa = generateX509Cert(
+ LocalDateTime.now(), Duration.ofSeconds(50));
+ HashSet knownCerts = new HashSet<>();
+ knownCerts.add(knownCert);
+ List certsFromScm = new ArrayList<>();
+ certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
+ certsFromScm.add(CertificateCodec.getPEMEncodedString(newRootCa));
+ RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
+ knownCerts, scmSecurityClient);
+ poller.run();
+ Mockito.when(scmSecurityClient.getAllRootCaCertificates())
+ .thenReturn(certsFromScm);
+ AtomicBoolean atomicBoolean = new AtomicBoolean();
+ atomicBoolean.set(false);
+ poller.addRootCARotationProcessor(
+ certificates -> CompletableFuture.supplyAsync(() -> {
+ atomicBoolean.set(true);
+ Assertions.assertEquals(certificates.size(), 2);
+ return null;
+ }));
+ GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000);
+ }
+
+ private X509Certificate generateX509Cert(
+ LocalDateTime startDate, Duration certLifetime) throws Exception {
+ KeyPair keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+ LocalDateTime start = startDate == null ? LocalDateTime.now() : startDate;
+ LocalDateTime end = start.plus(certLifetime);
+ return new JcaX509CertificateConverter().getCertificate(
+ SelfSignedCertificate.newBuilder().setBeginDate(start)
+ .setEndDate(end).setClusterID("cluster").setKey(keyPair)
+ .setSubject("localhost").setConfiguration(secConf).setScmID("test")
+ .build());
+ }
+}