Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public synchronized void close() throws IOException {
for (Map.Entry<String, ProxyInfo<SCMSecurityProtocolPB>> proxy :
scmProxies.entrySet()) {
if (proxy.getValue() != null) {
RPC.stopProxy(proxy.getValue());
RPC.stopProxy(proxy.getValue().proxy);
}
scmProxies.remove(proxy.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.proxy;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;

/**
* Proxy provider for
* {@link org.apache.hadoop.hdds.protocol.SCMSecurityProtocol} against a single
* SCM node (no fail-over).
*/
public class SingleSCMSecurityProtocolProxyProvider
extends SCMSecurityProtocolFailoverProxyProvider {
private final String scmNodeId;

public SingleSCMSecurityProtocolProxyProvider(
ConfigurationSource conf,
UserGroupInformation userGroupInformation,
String scmNodeId) {
super(conf, userGroupInformation);
this.scmNodeId = scmNodeId;
}

@Override
public synchronized String getCurrentProxySCMNodeId() {
return scmNodeId;
}

@Override
public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) {
// do nothing.
}

@Override
public synchronized void performFailoverToAssignedLeader(String newLeader,
Exception e) {
// do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public List<ManagedSecretKey> getSortedKeys() {
return state.getSortedKeys();
}

public void reinitialize(List<ManagedSecretKey> secretKeys) {
state.reinitialize(secretKeys);
}

private boolean shouldRotate(ManagedSecretKey currentKey) {
Duration established = between(currentKey.getCreationTime(), Instant.now());
return established.compareTo(rotationDuration) >= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public interface SecretKeyState {
*/
@Replicate
void updateKeys(List<ManagedSecretKey> newKeys) throws TimeoutException;

/**
* Update SecretKeys from a snapshot from SCM leader.
*/
void reinitialize(List<ManagedSecretKey> secretKeys);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public List<ManagedSecretKey> getSortedKeys() {
*/
@Override
public void updateKeys(List<ManagedSecretKey> newKeys) {
updateKeysInternal(newKeys);
}

private void updateKeysInternal(List<ManagedSecretKey> newKeys) {
LOG.info("Updating keys with {}", newKeys);
lock.writeLock().lock();
try {
Expand All @@ -127,4 +131,9 @@ public void updateKeys(List<ManagedSecretKey> newKeys) {
lock.writeLock().unlock();
}
}

@Override
public void reinitialize(List<ManagedSecretKey> secretKeys) {
updateKeysInternal(secretKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
import org.apache.hadoop.hdds.scm.proxy.SingleSCMSecurityProtocolProxyProvider;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
Expand Down Expand Up @@ -446,6 +447,16 @@ public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
UserGroupInformation.getCurrentUser()));
}

/**
* Create a scm security client to interact with a specific SCM node.
*/
public static SCMSecurityProtocolClientSideTranslatorPB
getScmSecurityClientSingleNode(ConfigurationSource conf, String scmNodeId,
UserGroupInformation ugi) throws IOException {
return new SCMSecurityProtocolClientSideTranslatorPB(
new SingleSCMSecurityProtocolProxyProvider(conf, ugi, scmNodeId));
}

public static SCMSecurityProtocolClientSideTranslatorPB
getScmSecurityClientWithMaxRetry(OzoneConfiguration conf,
UserGroupInformation ugi) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.ratis.server.protocol.TermIndex;

import java.io.IOException;
import java.util.List;

/**
* SCMHAManager provides HA service for SCM.
Expand Down Expand Up @@ -78,6 +80,12 @@ public interface SCMHAManager {
*/
DBCheckpoint downloadCheckpointFromLeader(String leaderId);

/**
* Get secret keys from SCM leader.
*/
List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID)
throws IOException;

/**
* Verify the SCM DB checkpoint downloaded from leader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.utils.HAUtils;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,6 +38,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.hdds.ExitManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
Expand All @@ -42,6 +47,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientSingleNode;

/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
Expand All @@ -58,6 +66,7 @@ public class SCMHAManagerImpl implements SCMHAManager {

private final SCMRatisServer ratisServer;
private final ConfigurationSource conf;
private final SecurityConfig securityConfig;
private final DBTransactionBuffer transactionBuffer;
private final SCMSnapshotProvider scmSnapshotProvider;
private final StorageContainerManager scm;
Expand All @@ -70,8 +79,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
* Creates SCMHAManager instance.
*/
public SCMHAManagerImpl(final ConfigurationSource conf,
SecurityConfig securityConfig,
final StorageContainerManager scm) throws IOException {
this.conf = conf;
this.securityConfig = securityConfig;
this.scm = scm;
this.exitManager = new ExitManager();
if (SCMHAUtils.isSCMHAEnabled(conf)) {
Expand Down Expand Up @@ -161,6 +172,21 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) {
return dBCheckpoint;
}

@Override
public List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID)
throws IOException {
if (!SecretKeyManagerService.isSecretKeyEnable(securityConfig)) {
return null;
}

LOG.info("Getting secret keys from leader {}.", leaderID);
try (SCMSecurityProtocolClientSideTranslatorPB securityProtocol =
getScmSecurityClientSingleNode(conf, leaderID,
UserGroupInformation.getLoginUser())) {
return securityProtocol.getAllSecretKeys();
}
}

@Override
public TermIndex verifyCheckpointFromLeader(String leaderId,
DBCheckpoint checkpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.ratis.grpc.GrpcTlsConfig;
Expand Down Expand Up @@ -136,6 +137,11 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) {
return null;
}

@Override
public List<ManagedSecretKey> getSecretKeysFromLeader(String leaderID) {
return null;
}

@Override
public TermIndex verifyCheckpointFromLeader(String leaderId,
DBCheckpoint checkpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Optional;
Expand All @@ -37,6 +38,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
Expand All @@ -62,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
* The SCMStateMachine is the state machine for SCMRatisServer. It is
* responsible for applying ratis committed transactions to
Expand All @@ -83,6 +87,7 @@ public class SCMStateMachine extends BaseStateMachine {
// ensures serializable between notifyInstallSnapshotFromLeader()
// and reinitialize().
private DBCheckpoint installingDBCheckpoint = null;
private List<ManagedSecretKey> installingSecretKeys = null;

private AtomicLong currentLeaderTerm = new AtomicLong(-1L);
private AtomicBoolean refreshedAfterLeaderReady = new AtomicBoolean(false);
Expand Down Expand Up @@ -243,12 +248,23 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
return null;
}

List<ManagedSecretKey> secretKeys;
try {
secretKeys =
scm.getScmHAManager().getSecretKeysFromLeader(leaderNodeId);
LOG.info("Got secret keys from leaders {}", secretKeys);
} catch (IOException ex) {
LOG.error("Failed to get secret keys from SCM leader {}",
leaderNodeId, ex);
return null;
}

TermIndex termIndex =
scm.getScmHAManager().verifyCheckpointFromLeader(
leaderNodeId, checkpoint);

if (termIndex != null) {
setInstallingDBCheckpoint(checkpoint);
setInstallingSnapshotData(checkpoint, secretKeys);
}
return termIndex;
},
Expand Down Expand Up @@ -379,9 +395,11 @@ public void pause() {
public void reinitialize() throws IOException {
Preconditions.checkNotNull(installingDBCheckpoint);
DBCheckpoint checkpoint = installingDBCheckpoint;
List<ManagedSecretKey> secretKeys = installingSecretKeys;

// explicitly set installingDBCheckpoint to be null
installingDBCheckpoint = null;
installingSecretKeys = null;

TermIndex termIndex = null;
try {
Expand All @@ -400,6 +418,10 @@ public void reinitialize() throws IOException {
LOG.error("Failed to unpause ", ioe);
}

if (secretKeys != null) {
requireNonNull(scm.getSecretKeyManager()).reinitialize(secretKeys);
}

getLifeCycle().transition(LifeCycle.State.STARTING);
getLifeCycle().transition(LifeCycle.State.RUNNING);
}
Expand All @@ -423,8 +445,10 @@ public void close() throws IOException {
}

@VisibleForTesting
public void setInstallingDBCheckpoint(DBCheckpoint checkpoint) {
public void setInstallingSnapshotData(DBCheckpoint checkpoint,
List<ManagedSecretKey> secretKeys) {
Preconditions.checkArgument(installingDBCheckpoint == null);
installingDBCheckpoint = checkpoint;
installingSecretKeys = secretKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ private void initializeSystemManagers(OzoneConfiguration conf,
if (configurator.getSCMHAManager() != null) {
scmHAManager = configurator.getSCMHAManager();
} else {
scmHAManager = new SCMHAManagerImpl(conf, this);
scmHAManager = new SCMHAManagerImpl(conf, securityConfig, this);
}

scmLayoutVersionManager = new HDDSLayoutVersionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void testInstallCheckPoint() throws Exception {
SCMStateMachine sm =
scm.getScmHAManager().getRatisServer().getSCMStateMachine();
sm.pause();
sm.setInstallingDBCheckpoint(checkpoint);
sm.setInstallingSnapshotData(checkpoint, null);
sm.reinitialize();

Assert.assertNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public StorageContainerManager getStorageContainerManager(int index) {
return this.scmhaService.getServiceByIndex(index);
}

public StorageContainerManager getScmLeader() {
return getStorageContainerManagers().stream()
.filter(StorageContainerManager::checkLeader)
.findFirst().orElse(null);
}

private OzoneManager getOMLeader(boolean waitForLeaderElection)
throws TimeoutException, InterruptedException {
if (waitForLeaderElection) {
Expand Down Expand Up @@ -699,9 +705,7 @@ private void initSCMHAConfig() {
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
"127.0.0.1:" + blockPort);

if (i <= numOfActiveSCMs) {
scmPorts.release(scmNodeId);
}
scmPorts.release(scmNodeId);
scmRpcPorts.release(scmNodeId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ public void testInstallCorruptedCheckpointFailure() throws Exception {
s == LifeCycle.State.NEW || s.isPausingOrPaused());

// Verify correct reloading
followerSM.setInstallingDBCheckpoint(
new RocksDBCheckpoint(checkpointBackup.toPath()));
followerSM.setInstallingSnapshotData(
new RocksDBCheckpoint(checkpointBackup.toPath()), null);
followerSM.reinitialize();
Assert.assertEquals(followerSM.getLastAppliedTermIndex(),
leaderCheckpointTrxnInfo.getTermIndex());
Expand Down
Loading