diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java index 44289f35ff28..d797bd0a11bd 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeInfo.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; +import net.jcip.annotations.Immutable; import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.ozone.ha.ConfUtils; @@ -52,15 +53,16 @@ * This class is used by SCM clients like OzoneManager, Client, Admin * commands to figure out SCM Node Information to make contact to SCM. */ +@Immutable public class SCMNodeInfo { private static final Logger LOG = LoggerFactory.getLogger(SCMNodeInfo.class); - private String serviceId; - private String nodeId; - private String blockClientAddress; - private String scmClientAddress; - private String scmSecurityAddress; - private String scmDatanodeAddress; + private final String serviceId; + private final String nodeId; + private final String blockClientAddress; + private final String scmClientAddress; + private final String scmSecurityAddress; + private final String scmDatanodeAddress; /** * Build SCM Node information from configuration. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 00e8e3e0ad22..13a8846f0423 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -98,7 +98,7 @@ public ScmBlockLocationProtocolClientSideTranslatorPB( this.failoverProxyProvider = proxyProvider; this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create( ScmBlockLocationProtocolPB.class, failoverProxyProvider, - failoverProxyProvider.getSCMBlockLocationRetryPolicy()); + failoverProxyProvider.getRetryPolicy()); } /** diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java index 70c31d05b291..60e69997599e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java @@ -17,283 +17,32 @@ package org.apache.hadoop.hdds.scm.proxy; -import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID; - -import com.google.common.annotations.VisibleForTesting; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Failover proxy provider for SCM block location. */ -public class SCMBlockLocationFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMBlockLocationFailoverProxyProvider extends + SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class); - private final SCMClientConfig scmClientConfig; - - private final Map> scmProxies; - private final Map scmProxyInfoMap; - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - private final ConfigurationSource conf; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) { - this.conf = conf; - this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); - - try { - this.ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Unable to fetch user credentials from UGI", ex); - throw new RuntimeException(ex); - } - - // Set some constant for non-HA. - if (scmServiceId == null) { - scmServiceId = SCM_DUMMY_SERVICE_ID; - } - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - - LOG.info("Created block location fail-over proxy with {} nodes: {}", - scmNodeIds.size(), scmProxyInfoMap.values()); - } - - private synchronized void loadConfigs() { - - scmNodeIds = new ArrayList<>(); - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getBlockClientAddress() == null) { - throw new ConfigurationException("SCM BlockClient Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmBlockClientAddress = - NetUtils.createSocketAddr(scmNodeInfo.getBlockClientAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo( - scmNodeInfo.getServiceId(), scmNodeInfo.getNodeId(), - scmBlockClientAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } - } - - @VisibleForTesting - public synchronized void changeCurrentProxy(String nodeId) { - currentProxyIndex = scmNodeIds.indexOf(nodeId); - currentProxySCMNodeId = nodeId; - nextProxyIndex(); - } - - private synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - @Override - public synchronized ProxyInfo getProxy() { - String currentProxyNodeId = getCurrentProxySCMNodeId(); - ProxyInfo currentProxyInfo = - scmProxies.get(currentProxyNodeId); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(currentProxyNodeId); - } - return currentProxyInfo; + super(ScmBlockLocationProtocolPB.class, conf, null); } @Override - public synchronized void performFailover( - ScmBlockLocationProtocolPB newLeader) { - //If leader node id is set, use that or else move to next proxy index. - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); + protected Logger getLogger() { + return LOG; } @Override - public Class getInterface() { - return ScmBlockLocationProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxy : scmProxies.values()) { - ScmBlockLocationProtocolPB scmProxy = proxy.proxy; - if (scmProxy != null) { - RPC.stopProxy(scmProxy); - } - } - } - - private synchronized long getRetryInterval() { - // TODO add exponential backup - return retryInterval; - } - - private synchronized void nextProxyIndex() { - // round robin the next proxy - - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - ScmBlockLocationProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } - } - - private ScmBlockLocationProtocolPB createSCMProxy( - InetSocketAddress scmAddress) throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocolPB.class, - ProtobufRpcEngine.class); - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same OM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - return RPC.getProtocolProxy(ScmBlockLocationProtocolPB.class, scmVersion, - scmAddress, ugi, hadoopConf, - NetUtils.getDefaultSocketFactory(hadoopConf), - (int) scmClientConfig.getRpcTimeOut(), - connectionRetryPolicy).getProxy(); - } - - public RetryPolicy getSCMBlockLocationRetryPolicy() { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retry, - int failover, boolean b) { - if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, e); - } - return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, - getRetryInterval()); - } - }; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getBlockClientAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java index 6e1d25bb0ffe..d0210128f0f0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java @@ -17,31 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.annotations.VisibleForTesting; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,261 +27,28 @@ /** * Failover proxy provider for StorageContainerLocationProtocolPB. */ -public class SCMContainerLocationFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMContainerLocationFailoverProxyProvider extends + SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMContainerLocationFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - /** * Construct SCMContainerLocationFailoverProxyProvider. * If userGroupInformation is not null, use the passed ugi, else obtain * from {@link UserGroupInformation#getCurrentUser()} - * @param conf - * @param userGroupInformation */ public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation) { - this.conf = conf; - - if (userGroupInformation == null) { - try { - this.ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException ex) { - LOG.error("Unable to fetch user credentials from UGI", ex); - throw new RuntimeException(ex); - } - } else { - this.ugi = userGroupInformation; - } - this.scmVersion = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - @VisibleForTesting - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmClientAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmClientAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmClientAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } - } - - @VisibleForTesting - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - @VisibleForTesting - public synchronized void changeCurrentProxy(String nodeId) { - currentProxyIndex = scmNodeIds.indexOf(nodeId); - currentProxySCMNodeId = nodeId; - nextProxyIndex(); + super(StorageContainerLocationProtocolPB.class, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - public synchronized List getProxies() { - for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) { - if (scmProxies.get(scmProxyInfo.getNodeId()) == null) { - scmProxies.put(scmProxyInfo.getNodeId(), - createSCMProxy(scmProxyInfo.getNodeId())); - } - } - return scmProxies.values().stream() - .map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList()); + protected Logger getLogger() { + return LOG; } @Override - public synchronized void performFailover( - StorageContainerLocationProtocolPB newLeader) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - assignLeaderToNode(newLeader); - } - - @Override - public Class getInterface() { - return StorageContainerLocationProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo - proxy : scmProxies.values()) { - StorageContainerLocationProtocolPB scmProxy = - proxy.proxy; - if (scmProxy != null) { - RPC.stopProxy(scmProxy); - } - } - } - - private long getRetryInterval() { - // TODO add exponential backup - return retryInterval; - } - - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - StorageContainerLocationProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } - } - - - private StorageContainerLocationProtocolPB createSCMProxy( - InetSocketAddress scmAddress) throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same OM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - return RPC.getProtocolProxy( - StorageContainerLocationProtocolPB.class, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - public RetryPolicy getRetryPolicy() { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retry, - int failover, boolean b) { - if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, e); - } - return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, - getRetryInterval()); - } - }; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmClientAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java new file mode 100644 index 000000000000..504504e597cb --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.ConfigurationException; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; +import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; +import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; + +/** + * A failover proxy provider base abstract class. + * Provides common methods for failover proxy provider + * implementations. Failover proxy provider allows clients to configure + * multiple SCMs to connect to. In case of SCM failover, client can try + * connecting to another SCM node from the list of proxies. + */ +public abstract class SCMFailoverProxyProviderBase implements FailoverProxyProvider { + + private final SCMClientConfig scmClientConfig; + + private final Class protocolClass; + + // scmNodeId -> ProxyInfo + private final Map> scmProxies; + // scmNodeId -> SCMProxyInfo + private final Map scmProxyInfoMap; + private List scmNodeIds; + + // As SCM Client is shared across threads, performFailOver() + // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is + // updated in shouldRetry(). When 2 or more threads run in parallel, the + // RetryInvocationHandler will check the expectedFailOverCount + // and not execute performFailOver() for one of them. So the other thread(s) + // shall not call performFailOver(), it will call getProxy() which uses + // currentProxySCMNodeId and returns the proxy. + private volatile String currentProxySCMNodeId; + private volatile int currentProxyIndex; + + private final ConfigurationSource conf; + private final long scmVersion; + + private final int maxRetryCount; + private final long retryInterval; + + private final UserGroupInformation ugi; + + private String updatedLeaderNodeID = null; + + /** + * Construct SCMFailoverProxyProviderBase. + * If userGroupInformation is not null, use the passed ugi, else obtain + * from {@link UserGroupInformation#getCurrentUser()} + */ + public SCMFailoverProxyProviderBase(Class protocol, ConfigurationSource conf, + UserGroupInformation userGroupInformation) { + this.protocolClass = protocol; + this.conf = conf; + + if (userGroupInformation == null) { + try { + this.ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException ex) { + getLogger().error("Unable to fetch user credentials from UGI", ex); + throw new RuntimeException(ex); + } + } else { + this.ugi = userGroupInformation; + } + this.scmVersion = RPC.getProtocolVersion(protocol); + + this.scmProxies = new HashMap<>(); + this.scmProxyInfoMap = new HashMap<>(); + loadConfigs(); + + this.currentProxyIndex = 0; + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + + scmClientConfig = conf.getObject(SCMClientConfig.class); + this.maxRetryCount = scmClientConfig.getRetryCount(); + this.retryInterval = scmClientConfig.getRetryInterval(); + + getLogger().info("Created fail-over proxy for protocol {} with {} nodes: {}", protocol.getSimpleName(), + scmNodeIds.size(), scmProxyInfoMap.values()); + } + + /** + * Get the logger implementation for the specific protocol's failover proxy provider. + */ + protected abstract Logger getLogger(); + + /** + * Get the specific protocol address from {@link SCMNodeInfo}. + * @param scmNodeInfo SCM node info which contains different protocols' address. + * @return protocol address. + */ + protected abstract String getProtocolAddress(SCMNodeInfo scmNodeInfo); + + /** + * Get the SCM node ID the current proxy is pointing to. + * This can be overridden with a single SCM node ID to disable SCM failover. + * See {@link SingleSecretKeyProtocolProxyProvider} + * @return current proxy's SCM Node ID. + */ + protected synchronized String getCurrentProxySCMNodeId() { + return currentProxySCMNodeId; + } + + @VisibleForTesting + protected synchronized void loadConfigs() { + List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); + scmNodeIds = new ArrayList<>(); + + + for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { + String protocolAddress = getProtocolAddress(scmNodeInfo); + if (protocolAddress == null) { + throw new ConfigurationException(protocolClass.getSimpleName() + " SCM Address could not " + + "be obtained from config. Config is not properly defined"); + } else { + InetSocketAddress protocolAddr = NetUtils.createSocketAddr(protocolAddress); + + String scmServiceId = scmNodeInfo.getServiceId(); + String scmNodeId = scmNodeInfo.getNodeId(); + scmNodeIds.add(scmNodeId); + SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, protocolAddr); + scmProxyInfoMap.put(scmNodeId, scmProxyInfo); + } + } + } + + + @VisibleForTesting + public synchronized void changeCurrentProxy(String nodeId) { + currentProxyIndex = scmNodeIds.indexOf(nodeId); + currentProxySCMNodeId = nodeId; + nextProxyIndex(); + } + + @Override + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); + if (currentProxyInfo == null) { + currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); + } + return currentProxyInfo; + } + + public synchronized List getProxies() { + for (SCMProxyInfo scmProxyInfo : scmProxyInfoMap.values()) { + if (scmProxies.get(scmProxyInfo.getNodeId()) == null) { + scmProxies.put(scmProxyInfo.getNodeId(), + createSCMProxy(scmProxyInfo.getNodeId())); + } + } + return scmProxies.values().stream() + .map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList()); + } + + @Override + public synchronized void performFailover(T newLeader) { + if (updatedLeaderNodeID != null) { + currentProxySCMNodeId = updatedLeaderNodeID; + } else { + nextProxyIndex(); + } + getLogger().debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); + } + + public synchronized void performFailoverToAssignedLeader(String newLeader, + Exception e) { + ServerNotLeaderException snle = + (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); + if (snle != null && snle.getSuggestedLeader() != null) { + Optional matchedProxyInfo = + scmProxyInfoMap.values().stream().filter( + proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) + .equals(snle.getSuggestedLeader())).findFirst(); + if (matchedProxyInfo.isPresent()) { + newLeader = matchedProxyInfo.get().getNodeId(); + getLogger().debug("Performing failover to suggested leader {}, nodeId {}", + snle.getSuggestedLeader(), newLeader); + } else { + getLogger().debug("Suggested leader {} does not match with any of the " + + "proxyInfo address {}", snle.getSuggestedLeader(), + Arrays.toString(scmProxyInfoMap.values().toArray())); + } + } + assignLeaderToNode(newLeader); + } + + @Override + public Class getInterface() { + return protocolClass; + } + + public List getSCMNodeIds() { + return Collections.unmodifiableList(scmNodeIds); + } + + public Collection getSCMProxyInfoList() { + return Collections.unmodifiableCollection(scmProxyInfoMap.values()); + } + + @Override + public synchronized void close() throws IOException { + for (ProxyInfo proxy : scmProxies.values()) { + T scmProxy = proxy.proxy; + if (scmProxy != null) { + RPC.stopProxy(scmProxy); + } + } + } + + private long getRetryInterval() { + // TODO add exponential backup + return retryInterval; + } + + private synchronized void nextProxyIndex() { + // round robin the next proxy + currentProxyIndex = (currentProxyIndex + 1) % scmProxyInfoMap.size(); + currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); + } + + private synchronized void assignLeaderToNode(String newLeaderNodeId) { + if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { + if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { + updatedLeaderNodeID = newLeaderNodeId; + getLogger().debug("Updated LeaderNodeID {}", updatedLeaderNodeID); + } else { + updatedLeaderNodeID = null; + } + } + } + + /** + * Creates proxy object. + */ + private ProxyInfo createSCMProxy(String nodeId) { + ProxyInfo proxyInfo; + SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); + InetSocketAddress address = scmProxyInfo.getAddress(); + try { + T scmProxy = createSCMProxy(address); + // Create proxyInfo here, to make it work with all Hadoop versions. + proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); + scmProxies.put(nodeId, proxyInfo); + return proxyInfo; + } catch (IOException ioe) { + getLogger().error("{} Failed to create RPC proxy to SCM at {}", + this.getClass().getSimpleName(), address, ioe); + throw new RuntimeException(ioe); + } + } + + private T createSCMProxy(InetSocketAddress scmAddress) throws IOException { + Configuration hadoopConf = + LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); + RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class); + // FailoverOnNetworkException ensures that the IPC layer does not attempt + // retries on the same SCM in case of connection exception. This retry + // policy essentially results in TRY_ONCE_THEN_FAIL. + RetryPolicy connectionRetryPolicy = RetryPolicies.failoverOnNetworkException(0); + return RPC.getProtocolProxy( + protocolClass, + scmVersion, scmAddress, ugi, + hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), + (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); + } + + public RetryPolicy getRetryPolicy() { + return new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retry, + int failover, boolean b) { + if (getLogger().isDebugEnabled()) { + if (e.getCause() != null) { + getLogger().debug("RetryProxy: SCM Server {}: {}: {}", + getCurrentProxySCMNodeId(), + e.getCause().getClass().getSimpleName(), + e.getCause().getMessage()); + } else { + getLogger().debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), + e.getMessage()); + } + } + + if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) { + setUpdatedLeaderNodeID(); + } else { + performFailoverToAssignedLeader(null, e); + } + return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount, + getRetryInterval()); + } + }; + } + + public synchronized void setUpdatedLeaderNodeID() { + this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java index c7b50af67af9..60e07b90b417 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java @@ -17,30 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,260 +27,26 @@ /** * Failover proxy provider for SCMSecurityProtocol server. */ -public class SCMSecurityProtocolFailoverProxyProvider implements - FailoverProxyProvider, Closeable { +public class SCMSecurityProtocolFailoverProxyProvider extends SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - - private String updatedLeaderNodeID = null; - /** * Construct fail-over proxy provider for SCMSecurityProtocol Server. - * @param conf - * @param userGroupInformation */ public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation) { - Preconditions.checkNotNull(userGroupInformation); - this.ugi = userGroupInformation; - this.conf = conf; - this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmSecurityAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmSecurityAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmSecurityAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } + super(SCMSecurityProtocolPB.class, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - SCMSecurityProtocolPB scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } + protected Logger getLogger() { + return LOG; } - private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress) - throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class, - ProtobufRpcEngine.class); - - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same SCM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - - return RPC.getProtocolProxy(SCMSecurityProtocolPB.class, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - @Override - public synchronized void performFailover(SCMSecurityProtocolPB currentProxy) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional< SCMProxyInfo > matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); - } - - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Update the proxy index to the next proxy in the list. - * @return the new proxy index - */ - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - public RetryPolicy getRetryPolicy() { - // Client will attempt up to maxFailovers number of failovers between - // available SCMs before throwing exception. - RetryPolicy retryPolicy = new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception exception, int retries, - int failovers, boolean isIdempotentOrAtMostOnce) - throws Exception { - - if (LOG.isDebugEnabled()) { - if (exception.getCause() != null) { - LOG.debug("RetryProxy: SCM Security Server {}: {}: {}", - getCurrentProxySCMNodeId(), - exception.getCause().getClass().getSimpleName(), - exception.getCause().getMessage()); - } else { - LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), - exception.getMessage()); - } - } - - if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, exception); - } - return SCMHAUtils - .getRetryAction(failovers, retries, exception, maxRetryCount, - getRetryInterval()); - } - }; - - return retryPolicy; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); - } - - @Override - public Class< SCMSecurityProtocolPB > getInterface() { - return SCMSecurityProtocolPB.class; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxyInfo : scmProxies.values()) { - SCMSecurityProtocolPB proxy = proxyInfo.proxy; - if (proxy != null) { - RPC.stopProxy(proxy); - } - } - } - - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - private long getRetryInterval() { - return retryInterval; + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmSecurityAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java index 25f5b5524e40..0359bb062cd8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SecretKeyProtocolFailoverProxyProvider.java @@ -17,30 +17,9 @@ package org.apache.hadoop.hdds.scm.proxy; -import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.ConfigurationException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService; -import org.apache.hadoop.hdds.ratis.ServerNotLeaderException; -import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,254 +28,26 @@ * Failover proxy provider for SCMSecretKeyProtocolService server. */ public class SecretKeyProtocolFailoverProxyProvider - implements - FailoverProxyProvider, Closeable { + extends SCMFailoverProxyProviderBase { public static final Logger LOG = LoggerFactory.getLogger(SecretKeyProtocolFailoverProxyProvider.class); - // scmNodeId -> ProxyInfo - private final Map> scmProxies; - - // scmNodeId -> SCMProxyInfo - private final Map scmProxyInfoMap; - - private List scmNodeIds; - - // As SCM Client is shared across threads, performFailOver() - // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is - // updated in shouldRetry(). When 2 or more threads run in parallel, the - // RetryInvocationHandler will check the expectedFailOverCount - // and not execute performFailOver() for one of them. So the other thread(s) - // shall not call performFailOver(), it will call getProxy() which uses - // currentProxySCMNodeId and returns the proxy. - private volatile String currentProxySCMNodeId; - private volatile int currentProxyIndex; - - - private final ConfigurationSource conf; - private final SCMClientConfig scmClientConfig; - private final long scmVersion; - - private String scmServiceId; - - private final int maxRetryCount; - private final long retryInterval; - - private final UserGroupInformation ugi; - private final Class proxyClazz; - - private String updatedLeaderNodeID = null; - /** * Construct fail-over proxy provider for SCMSecurityProtocol Server. - * @param conf - * @param userGroupInformation */ public SecretKeyProtocolFailoverProxyProvider(ConfigurationSource conf, UserGroupInformation userGroupInformation, Class proxyClazz) { - Preconditions.checkNotNull(userGroupInformation); - this.ugi = userGroupInformation; - this.conf = conf; - this.proxyClazz = proxyClazz; - this.scmVersion = RPC.getProtocolVersion(proxyClazz); - - this.scmProxies = new HashMap<>(); - this.scmProxyInfoMap = new HashMap<>(); - loadConfigs(); - - this.currentProxyIndex = 0; - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - scmClientConfig = conf.getObject(SCMClientConfig.class); - this.maxRetryCount = scmClientConfig.getRetryCount(); - this.retryInterval = scmClientConfig.getRetryInterval(); - } - - protected synchronized void loadConfigs() { - List scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf); - scmNodeIds = new ArrayList<>(); - - for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) { - if (scmNodeInfo.getScmSecurityAddress() == null) { - throw new ConfigurationException("SCM Client Address could not " + - "be obtained from config. Config is not properly defined"); - } else { - InetSocketAddress scmSecurityAddress = - NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress()); - - scmServiceId = scmNodeInfo.getServiceId(); - String scmNodeId = scmNodeInfo.getNodeId(); - - scmNodeIds.add(scmNodeId); - SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId, - scmSecurityAddress); - scmProxyInfoMap.put(scmNodeId, scmProxyInfo); - } - } + super(proxyClazz, conf, userGroupInformation); } @Override - public synchronized ProxyInfo getProxy() { - ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId()); - if (currentProxyInfo == null) { - currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId()); - } - return currentProxyInfo; - } - - /** - * Creates proxy object. - */ - private ProxyInfo createSCMProxy(String nodeId) { - ProxyInfo proxyInfo; - SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId); - InetSocketAddress address = scmProxyInfo.getAddress(); - try { - T scmProxy = createSCMProxy(address); - // Create proxyInfo here, to make it work with all Hadoop versions. - proxyInfo = new ProxyInfo(scmProxy, scmProxyInfo.toString()); - scmProxies.put(nodeId, proxyInfo); - return proxyInfo; - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to SCM at {}", - this.getClass().getSimpleName(), address, ioe); - throw new RuntimeException(ioe); - } + protected Logger getLogger() { + return LOG; } - private T createSCMProxy(InetSocketAddress scmAddress) - throws IOException { - Configuration hadoopConf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); - RPC.setProtocolEngine(hadoopConf, proxyClazz, - ProtobufRpcEngine.class); - - // FailoverOnNetworkException ensures that the IPC layer does not attempt - // retries on the same SCM in case of connection exception. This retry - // policy essentially results in TRY_ONCE_THEN_FAIL. - - RetryPolicy connectionRetryPolicy = RetryPolicies - .failoverOnNetworkException(0); - - return RPC.getProtocolProxy(proxyClazz, - scmVersion, scmAddress, ugi, - hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), - (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy(); - } - - @Override - public synchronized void performFailover(T currentProxy) { - if (updatedLeaderNodeID != null) { - currentProxySCMNodeId = updatedLeaderNodeID; - } else { - nextProxyIndex(); - } - LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId()); - } - - public synchronized void performFailoverToAssignedLeader(String newLeader, - Exception e) { - ServerNotLeaderException snle = - (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e); - if (snle != null && snle.getSuggestedLeader() != null) { - Optional< SCMProxyInfo > matchedProxyInfo = - scmProxyInfoMap.values().stream().filter( - proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress()) - .equals(snle.getSuggestedLeader())).findFirst(); - if (matchedProxyInfo.isPresent()) { - newLeader = matchedProxyInfo.get().getNodeId(); - LOG.debug("Performing failover to suggested leader {}, nodeId {}", - snle.getSuggestedLeader(), newLeader); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Suggested leader {} does not match with any of the " + - "proxyInfo adress {}", snle.getSuggestedLeader(), - Arrays.toString(scmProxyInfoMap.values().toArray())); - } - } - } - assignLeaderToNode(newLeader); - } - - - private synchronized void assignLeaderToNode(String newLeaderNodeId) { - if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { - if (scmProxyInfoMap.containsKey(newLeaderNodeId)) { - updatedLeaderNodeID = newLeaderNodeId; - LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID); - } else { - updatedLeaderNodeID = null; - } - } - } - - /** - * Update the proxy index to the next proxy in the list. - * @return the new proxy index - */ - private synchronized void nextProxyIndex() { - // round robin the next proxy - currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size(); - currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex); - } - - public RetryPolicy getRetryPolicy() { - // Client will attempt up to maxFailovers number of failovers between - // available SCMs before throwing exception. - - return (exception, retries, failovers, isIdempotentOrAtMostOnce) -> { - - if (LOG.isDebugEnabled()) { - if (exception.getCause() != null) { - LOG.debug("RetryProxy: SCM Security Server {}: {}: {}", - getCurrentProxySCMNodeId(), - exception.getCause().getClass().getSimpleName(), - exception.getCause().getMessage()); - } else { - LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(), - exception.getMessage()); - } - } - - if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) { - setUpdatedLeaderNodeID(); - } else { - performFailoverToAssignedLeader(null, exception); - } - return SCMHAUtils - .getRetryAction(failovers, retries, exception, maxRetryCount, - getRetryInterval()); - }; - } - - public synchronized void setUpdatedLeaderNodeID() { - this.updatedLeaderNodeID = getCurrentProxySCMNodeId(); - } - - @Override - public Class getInterface() { - return proxyClazz; - } - - @Override - public synchronized void close() throws IOException { - for (ProxyInfo proxyInfo : scmProxies.values()) { - if (proxyInfo.proxy != null) { - RPC.stopProxy(proxyInfo.proxy); - } - } - } - - public synchronized String getCurrentProxySCMNodeId() { - return currentProxySCMNodeId; - } - - public synchronized int getCurrentProxyIndex() { - return currentProxyIndex; - } - - private long getRetryInterval() { - return retryInterval; + protected String getProtocolAddress(SCMNodeInfo scmNodeInfo) { + return scmNodeInfo.getScmSecurityAddress(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java index c41e3627ea50..2cfa051a9364 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SingleSecretKeyProtocolProxyProvider.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Proxy provider for SCMSecretKeyProtocolService against a @@ -28,6 +30,10 @@ public class SingleSecretKeyProtocolProxyProvider extends SecretKeyProtocolFailoverProxyProvider { + + public static final Logger LOG = + LoggerFactory.getLogger(SingleSecretKeyProtocolProxyProvider.class); + private final String scmNodeId; public SingleSecretKeyProtocolProxyProvider( @@ -49,6 +55,11 @@ public synchronized void performFailover(T currentProxy) { // do nothing. } + @Override + protected Logger getLogger() { + return LOG; + } + @Override public synchronized void performFailoverToAssignedLeader(String newLeader, Exception e) {