diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java index 18d1be6b4338..5a95e1ffd732 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java @@ -289,7 +289,7 @@ public synchronized void close() throws IOException { for (Map.Entry> proxy : scmProxies.entrySet()) { if (proxy.getValue() != null) { - RPC.stopProxy(proxy.getValue()); + RPC.stopProxy(proxy.getValue().proxy); } scmProxies.remove(proxy.getKey()); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java new file mode 100644 index 000000000000..a69522d14f3c --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSCMSecurityProtocolProxyProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.proxy; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Proxy provider for + * {@link org.apache.hadoop.hdds.protocol.SCMSecurityProtocol} against a single + * SCM node (no fail-over). + */ +public class SingleSCMSecurityProtocolProxyProvider + extends SCMSecurityProtocolFailoverProxyProvider { + private final String scmNodeId; + + public SingleSCMSecurityProtocolProxyProvider( + ConfigurationSource conf, + UserGroupInformation userGroupInformation, + String scmNodeId) { + super(conf, userGroupInformation); + this.scmNodeId = scmNodeId; + } + + @Override + public synchronized String getCurrentProxySCMNodeId() { + return scmNodeId; + } + + @Override + public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) { + // do nothing. + } + + @Override + public synchronized void performFailoverToAssignedLeader(String newLeader, + Exception e) { + // do nothing. + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java index f7a481cc053f..8685a7fb5236 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java @@ -143,6 +143,10 @@ public List getSortedKeys() { return state.getSortedKeys(); } + public void reinitialize(List secretKeys) { + state.reinitialize(secretKeys); + } + private boolean shouldRotate(ManagedSecretKey currentKey) { Duration established = between(currentKey.getCreationTime(), Instant.now()); return established.compareTo(rotationDuration) >= 0; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java index 7b510a10b250..43518b901a78 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyState.java @@ -52,4 +52,9 @@ public interface SecretKeyState { */ @Replicate void updateKeys(List newKeys) throws TimeoutException; + + /** + * Update SecretKeys from a snapshot from SCM leader. + */ + void reinitialize(List secretKeys); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java index 727b005d2b4d..b3f0ae55d997 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java @@ -106,6 +106,10 @@ public List getSortedKeys() { */ @Override public void updateKeys(List newKeys) { + updateKeysInternal(newKeys); + } + + private void updateKeysInternal(List newKeys) { LOG.info("Updating keys with {}", newKeys); lock.writeLock().lock(); try { @@ -127,4 +131,9 @@ public void updateKeys(List newKeys) { lock.writeLock().unlock(); } } + + @Override + public void reinitialize(List secretKeys) { + updateKeysInternal(secretKeys); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java index 33d8c178c72b..c15d5b52dd96 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig; import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider; +import org.apache.hadoop.hdds.scm.proxy.SingleSCMSecurityProtocolProxyProvider; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; @@ -446,6 +447,16 @@ public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient( UserGroupInformation.getCurrentUser())); } + /** + * Create a scm security client to interact with a specific SCM node. + */ + public static SCMSecurityProtocolClientSideTranslatorPB + getScmSecurityClientSingleNode(ConfigurationSource conf, String scmNodeId, + UserGroupInformation ugi) throws IOException { + return new SCMSecurityProtocolClientSideTranslatorPB( + new SingleSCMSecurityProtocolProxyProvider(conf, ugi, scmNodeId)); + } + public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClientWithMaxRetry(OzoneConfiguration conf, UserGroupInformation ugi) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java index bc4748e3feef..b7a3fce3acb3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -19,10 +19,12 @@ import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; +import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.ratis.server.protocol.TermIndex; import java.io.IOException; +import java.util.List; /** * SCMHAManager provides HA service for SCM. @@ -78,6 +80,12 @@ public interface SCMHAManager { */ DBCheckpoint downloadCheckpointFromLeader(String leaderId); + /** + * Get secret keys from SCM leader. + */ + List getSecretKeysFromLeader(String leaderID) + throws IOException; + /** * Verify the SCM DB checkpoint downloaded from leader. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index 934c25e980ce..34a945d6d00e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -20,11 +20,15 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; +import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.utils.HAUtils; import com.google.common.annotations.VisibleForTesting; @@ -34,6 +38,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.hdds.ExitManager; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; @@ -42,6 +47,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.List; + +import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientSingleNode; /** * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1 @@ -58,6 +66,7 @@ public class SCMHAManagerImpl implements SCMHAManager { private final SCMRatisServer ratisServer; private final ConfigurationSource conf; + private final SecurityConfig securityConfig; private final DBTransactionBuffer transactionBuffer; private final SCMSnapshotProvider scmSnapshotProvider; private final StorageContainerManager scm; @@ -70,8 +79,10 @@ public class SCMHAManagerImpl implements SCMHAManager { * Creates SCMHAManager instance. */ public SCMHAManagerImpl(final ConfigurationSource conf, + SecurityConfig securityConfig, final StorageContainerManager scm) throws IOException { this.conf = conf; + this.securityConfig = securityConfig; this.scm = scm; this.exitManager = new ExitManager(); if (SCMHAUtils.isSCMHAEnabled(conf)) { @@ -161,6 +172,21 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) { return dBCheckpoint; } + @Override + public List getSecretKeysFromLeader(String leaderID) + throws IOException { + if (!SecretKeyManagerService.isSecretKeyEnable(securityConfig)) { + return null; + } + + LOG.info("Getting secret keys from leader {}.", leaderID); + try (SCMSecurityProtocolClientSideTranslatorPB securityProtocol = + getScmSecurityClientSingleNode(conf, leaderID, + UserGroupInformation.getLoginUser())) { + return securityProtocol.getAllSecretKeys(); + } + } + @Override public TermIndex verifyCheckpointFromLeader(String leaderId, DBCheckpoint checkpoint) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java index c9c51b8a7ce9..fdb4c6e784e0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; +import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.ratis.grpc.GrpcTlsConfig; @@ -136,6 +137,11 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) { return null; } + @Override + public List getSecretKeysFromLeader(String leaderID) { + return null; + } + @Override public TermIndex verifyCheckpointFromLeader(String leaderId, DBCheckpoint checkpoint) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 5e5298a96767..d9ac7b6d4b69 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; +import java.util.List; import java.util.Map; import java.util.Collection; import java.util.Optional; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -62,6 +64,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + /** * The SCMStateMachine is the state machine for SCMRatisServer. It is * responsible for applying ratis committed transactions to @@ -83,6 +87,7 @@ public class SCMStateMachine extends BaseStateMachine { // ensures serializable between notifyInstallSnapshotFromLeader() // and reinitialize(). private DBCheckpoint installingDBCheckpoint = null; + private List installingSecretKeys = null; private AtomicLong currentLeaderTerm = new AtomicLong(-1L); private AtomicBoolean refreshedAfterLeaderReady = new AtomicBoolean(false); @@ -243,12 +248,23 @@ public CompletableFuture notifyInstallSnapshotFromLeader( return null; } + List secretKeys; + try { + secretKeys = + scm.getScmHAManager().getSecretKeysFromLeader(leaderNodeId); + LOG.info("Got secret keys from leaders {}", secretKeys); + } catch (IOException ex) { + LOG.error("Failed to get secret keys from SCM leader {}", + leaderNodeId, ex); + return null; + } + TermIndex termIndex = scm.getScmHAManager().verifyCheckpointFromLeader( leaderNodeId, checkpoint); if (termIndex != null) { - setInstallingDBCheckpoint(checkpoint); + setInstallingSnapshotData(checkpoint, secretKeys); } return termIndex; }, @@ -379,9 +395,11 @@ public void pause() { public void reinitialize() throws IOException { Preconditions.checkNotNull(installingDBCheckpoint); DBCheckpoint checkpoint = installingDBCheckpoint; + List secretKeys = installingSecretKeys; // explicitly set installingDBCheckpoint to be null installingDBCheckpoint = null; + installingSecretKeys = null; TermIndex termIndex = null; try { @@ -400,6 +418,10 @@ public void reinitialize() throws IOException { LOG.error("Failed to unpause ", ioe); } + if (secretKeys != null) { + requireNonNull(scm.getSecretKeyManager()).reinitialize(secretKeys); + } + getLifeCycle().transition(LifeCycle.State.STARTING); getLifeCycle().transition(LifeCycle.State.RUNNING); } @@ -423,8 +445,10 @@ public void close() throws IOException { } @VisibleForTesting - public void setInstallingDBCheckpoint(DBCheckpoint checkpoint) { + public void setInstallingSnapshotData(DBCheckpoint checkpoint, + List secretKeys) { Preconditions.checkArgument(installingDBCheckpoint == null); installingDBCheckpoint = checkpoint; + installingSecretKeys = secretKeys; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 82d912f7d3b9..67554743bd0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -614,7 +614,7 @@ private void initializeSystemManagers(OzoneConfiguration conf, if (configurator.getSCMHAManager() != null) { scmHAManager = configurator.getSCMHAManager(); } else { - scmHAManager = new SCMHAManagerImpl(conf, this); + scmHAManager = new SCMHAManagerImpl(conf, securityConfig, this); } scmLayoutVersionManager = new HDDSLayoutVersionManager( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java index cd8a7d2e68e0..407ae946649e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java @@ -151,7 +151,7 @@ public void testInstallCheckPoint() throws Exception { SCMStateMachine sm = scm.getScmHAManager().getRatisServer().getSCMStateMachine(); sm.pause(); - sm.setInstallingDBCheckpoint(checkpoint); + sm.setInstallingSnapshotData(checkpoint, null); sm.reinitialize(); Assert.assertNotNull( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index c1f172e1ae8f..97ff6eb987a3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -170,6 +170,12 @@ public StorageContainerManager getStorageContainerManager(int index) { return this.scmhaService.getServiceByIndex(index); } + public StorageContainerManager getScmLeader() { + return getStorageContainerManagers().stream() + .filter(StorageContainerManager::checkLeader) + .findFirst().orElse(null); + } + private OzoneManager getOMLeader(boolean waitForLeaderElection) throws TimeoutException, InterruptedException { if (waitForLeaderElection) { @@ -699,9 +705,7 @@ private void initSCMHAConfig() { conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:" + blockPort); - if (i <= numOfActiveSCMs) { - scmPorts.release(scmNodeId); - } + scmPorts.release(scmNodeId); scmRpcPorts.release(scmNodeId); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java index 3b8ad5faf347..74868bee2af4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java @@ -279,8 +279,8 @@ public void testInstallCorruptedCheckpointFailure() throws Exception { s == LifeCycle.State.NEW || s.isPausingOrPaused()); // Verify correct reloading - followerSM.setInstallingDBCheckpoint( - new RocksDBCheckpoint(checkpointBackup.toPath())); + followerSM.setInstallingSnapshotData( + new RocksDBCheckpoint(checkpointBackup.toPath()), null); followerSM.reinitialize(); Assert.assertEquals(followerSM.getLastAppliedTermIndex(), leaderCheckpointTrxnInfo.getTermIndex()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java new file mode 100644 index 000000000000..410fc5bd653e --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSecretKeySnapshot.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm; + +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.DefaultConfigManager; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.ha.SCMStateMachine; +import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; +import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.ExitUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION; +import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test to verify that symmetric secret keys are correctly + * synchronized from leader to follower during snapshot installation. + */ +@Timeout(500) +public final class TestSecretKeySnapshot { + private static final Logger LOG = LoggerFactory + .getLogger(TestSecretKeySnapshot.class); + private static final long SNAPSHOT_THRESHOLD = 100; + private static final int LOG_PURGE_GAP = 100; + public static final int ROTATE_CHECK_DURATION_MS = 1_000; + public static final int ROTATE_DURATION_MS = 30_000; + public static final int EXPIRY_DURATION_MS = 61_000; + + private MiniKdc miniKdc; + private OzoneConfiguration conf; + private File workDir; + private File ozoneKeytab; + private File spnegoKeytab; + private String host; + private String clusterId; + private String scmId; + private MiniOzoneHAClusterImpl cluster; + + @BeforeEach + public void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost"); + + ExitUtils.disableSystemExit(); + + workDir = GenericTestUtils.getTestDir(getClass().getSimpleName()); + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + + startMiniKdc(); + setSecureConfig(); + createCredentialsInKDC(); + + conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_ENABLED, true); + conf.setInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP, LOG_PURGE_GAP); + conf.setLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD, + SNAPSHOT_THRESHOLD); + + conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, + ROTATE_CHECK_DURATION_MS + "ms"); + conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, ROTATE_DURATION_MS + "ms"); + conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, EXPIRY_DURATION_MS + "ms"); + + MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf) + .setClusterId(clusterId) + .setSCMServiceId("TestSecretKeySnapshot") + .setScmId(scmId) + .setSCMServiceId("SCMServiceId") + .setNumDatanodes(1) + .setNumOfStorageContainerManagers(3) + .setNumOfActiveSCMs(2) + .setNumOfOzoneManagers(1); + + cluster = (MiniOzoneHAClusterImpl) builder.build(); + cluster.waitForClusterToBeReady(); + } + + @AfterEach + public void stop() { + miniKdc.stop(); + if (cluster != null) { + cluster.stop(); + } + DefaultConfigManager.clearDefaultConfigs(); + } + + private void createCredentialsInKDC() throws Exception { + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + SCMHTTPServerConfig httpServerConfig = + conf.getObject(SCMHTTPServerConfig.class); + createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal()); + createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal()); + } + + private void createPrincipal(File keytab, String... principal) + throws Exception { + miniKdc.createPrincipal(keytab, principal); + } + + private void startMiniKdc() throws Exception { + Properties securityProperties = MiniKdc.createConf(); + miniKdc = new MiniKdc(securityProperties, workDir); + miniKdc.start(); + } + + private void setSecureConfig() throws IOException { + conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + host = InetAddress.getLocalHost().getCanonicalHostName() + .toLowerCase(); + + conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name()); + + String curUser = UserGroupInformation.getCurrentUser().getUserName(); + conf.set(OZONE_ADMINISTRATORS, curUser); + + String realm = miniKdc.getRealm(); + String hostAndRealm = host + "@" + realm; + conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm); + conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + + ozoneKeytab = new File(workDir, "scm.keytab"); + spnegoKeytab = new File(workDir, "http.keytab"); + + conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, + ozoneKeytab.getAbsolutePath()); + conf.set(HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY, + spnegoKeytab.getAbsolutePath()); + conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, + ozoneKeytab.getAbsolutePath()); + conf.set(OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE, + spnegoKeytab.getAbsolutePath()); + conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY, + ozoneKeytab.getAbsolutePath()); + + conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); + } + + @Test + public void testInstallSnapshot() throws Exception { + // Get the leader SCM + StorageContainerManager leaderSCM = cluster.getScmLeader(); + assertNotNull(leaderSCM); + // Find the inactive SCM + String followerId = cluster.getInactiveSCM().next().getSCMNodeId(); + + StorageContainerManager followerSCM = cluster.getSCM(followerId); + + // wait until leader SCM got enough secret keys. + SecretKeyManager leaderSecretKeyManager = leaderSCM.getSecretKeyManager(); + GenericTestUtils.waitFor( + () -> leaderSecretKeyManager.getSortedKeys().size() >= 2, + ROTATE_CHECK_DURATION_MS, EXPIRY_DURATION_MS); + + writeToIncreaseLogIndex(leaderSCM, 200); + ManagedSecretKey currentKeyInLeader = + leaderSecretKeyManager.getCurrentSecretKey(); + + // Start the inactive SCM. Install Snapshot will happen as part + // of setConfiguration() call to ratis leader and the follower will catch + // up + LOG.info("Starting follower..."); + cluster.startInactiveSCM(followerId); + + // The recently started should be lagging behind the leader . + SCMStateMachine followerSM = + followerSCM.getScmHAManager().getRatisServer().getSCMStateMachine(); + + // Wait & retry for follower to update transactions to leader + // snapshot index. + // Timeout error if follower does not load update within 3s + GenericTestUtils.waitFor(() -> + followerSM.getLastAppliedTermIndex().getIndex() >= 200, + 100, 3000); + long followerLastAppliedIndex = + followerSM.getLastAppliedTermIndex().getIndex(); + assertTrue(followerLastAppliedIndex >= 200); + assertFalse(followerSM.getLifeCycleState().isPausingOrPaused()); + + // Verify that the follower has the secret keys created + // while it was inactive. + SecretKeyManager followerSecretKeyManager = + followerSCM.getSecretKeyManager(); + assertTrue(followerSecretKeyManager.isInitialized()); + List followerKeys = + followerSecretKeyManager.getSortedKeys(); + LOG.info("Follower secret keys after snapshot: {}", followerKeys); + assertTrue(followerKeys.size() >= 2); + assertTrue(followerKeys.contains(currentKeyInLeader)); + assertEquals(leaderSecretKeyManager.getSortedKeys(), followerKeys); + + // Wait for the next rotation, assert that the updates can be synchronized + // normally post snapshot. + ManagedSecretKey currentKeyPostSnapshot = + leaderSecretKeyManager.getCurrentSecretKey(); + GenericTestUtils.waitFor(() -> + !leaderSecretKeyManager.getCurrentSecretKey() + .equals(currentKeyPostSnapshot), + ROTATE_CHECK_DURATION_MS, ROTATE_DURATION_MS); + assertEquals(leaderSecretKeyManager.getSortedKeys(), + followerSecretKeyManager.getSortedKeys()); + + } + + private List writeToIncreaseLogIndex( + StorageContainerManager scm, long targetLogIndex) + throws IOException, InterruptedException, TimeoutException { + List containers = new ArrayList<>(); + SCMStateMachine stateMachine = + scm.getScmHAManager().getRatisServer().getSCMStateMachine(); + long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine() + .getLastAppliedTermIndex().getIndex(); + while (logIndex <= targetLogIndex) { + containers.add(scm.getContainerManager() + .allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.ONE), + this.getClass().getName())); + Thread.sleep(100); + logIndex = stateMachine.getLastAppliedTermIndex().getIndex(); + } + return containers; + } + +}