diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index cf888697313b..e5ee1234e335 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -68,7 +68,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -92,15 +94,20 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB private static final RpcController NULL_RPC_CONTROLLER = null; private final StorageContainerLocationProtocolPB rpcProxy; + private final SCMContainerLocationFailoverProxyProvider failoverProxyProvider; /** * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. * - * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy + * @param proxyProvider {@link SCMContainerLocationFailoverProxyProvider} */ public StorageContainerLocationProtocolClientSideTranslatorPB( - StorageContainerLocationProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; + SCMContainerLocationFailoverProxyProvider proxyProvider) { + Preconditions.checkNotNull(proxyProvider); + this.failoverProxyProvider = proxyProvider; + this.rpcProxy = (StorageContainerLocationProtocolPB) RetryProxy.create( + StorageContainerLocationProtocolPB.class, failoverProxyProvider, + failoverProxyProvider.getSCMContainerLocationRetryPolicy(null)); } /** @@ -127,7 +134,13 @@ private ScmContainerLocationResponse submitRequest( private ScmContainerLocationResponse submitRpcRequest( ScmContainerLocationRequest wrapper) throws ServiceException { - return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper); + ScmContainerLocationResponse response = + rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper); + if (response.getStatus() == + ScmContainerLocationResponse.Status.SCM_NOT_LEADER) { + failoverProxyProvider.performFailoverToAssignedLeader(null); + } + return response; } /** diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java index a9ff4c1ea775..bcc1a01c13c4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java @@ -51,7 +51,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getHostName; /** - * Failover proxy provider for SCM. + * Failover proxy provider for SCM block location. */ public class SCMBlockLocationFailoverProxyProvider implements FailoverProxyProvider, Closeable { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java new file mode 100644 index 000000000000..a04a66f4f278 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMContainerLocationFailoverProxyProvider.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hdds.HddsUtils.getHostName; +import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys; +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY; + +/** + * Failover proxy provider for SCM container location. + */ +public class SCMContainerLocationFailoverProxyProvider implements + FailoverProxyProvider, Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(SCMContainerLocationFailoverProxyProvider.class); + + private Map> scmProxies; + private Map scmProxyInfoMap; + private List scmNodeIDList; + + private String currentProxySCMNodeId; + private int currentProxyIndex; + + private final ConfigurationSource conf; + private final SCMClientConfig scmClientConfig; + private final long scmVersion; + + private final String scmServiceId; + + private final int maxRetryCount; + private final long retryInterval; + + public static final String SCM_DUMMY_NODEID_PREFIX = "scm"; + + public SCMContainerLocationFailoverProxyProvider(ConfigurationSource conf) { + this.conf = conf; + this.scmVersion = RPC.getProtocolVersion( + StorageContainerLocationProtocolPB.class); + this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY); + this.scmProxies = new HashMap<>(); + this.scmProxyInfoMap = new HashMap<>(); + this.scmNodeIDList = new ArrayList<>(); + loadConfigs(); + + this.currentProxyIndex = 0; + currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex); + scmClientConfig = conf.getObject(SCMClientConfig.class); + this.maxRetryCount = scmClientConfig.getRetryCount(); + this.retryInterval = scmClientConfig.getRetryInterval(); + } + + @VisibleForTesting + protected Collection getSCMAddressList() { + Collection scmAddressList = + conf.getTrimmedStringCollection(OZONE_SCM_NAMES); + Collection resultList = new ArrayList<>(); + if (!scmAddressList.isEmpty()) { + final int port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY) + .orElse(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT); + for (String scmAddress : scmAddressList) { + LOG.debug("SCM Address for proxy is {}", scmAddress); + + Optional hostname = getHostName(scmAddress); + if (hostname.isPresent()) { + resultList.add(NetUtils.createSocketAddr( + hostname.get() + ":" + port)); + } + } + } + if (resultList.isEmpty()) { + // fall back + resultList.add(getScmAddressForClients(conf)); + } + return resultList; + } + + private void loadConfigs() { + Collection scmAddressList = getSCMAddressList(); + int scmNodeIndex = 1; + for (InetSocketAddress scmAddress : scmAddressList) { + String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex; + if (scmAddress == null) { + LOG.error("Failed to create SCM proxy for {}.", nodeId); + continue; + } + scmNodeIndex++; + SCMProxyInfo scmProxyInfo = new SCMProxyInfo( + scmServiceId, nodeId, scmAddress); + ProxyInfo proxy + = new ProxyInfo<>(null, scmProxyInfo.toString()); + scmProxies.put(nodeId, proxy); + scmProxyInfoMap.put(nodeId, scmProxyInfo); + scmNodeIDList.add(nodeId); + } + + if (scmProxies.isEmpty()) { + throw new IllegalArgumentException("Could not find any configured " + + "addresses for SCM. Please configure the system with " + + OZONE_SCM_NAMES); + } + } + + @VisibleForTesting + public synchronized String getCurrentProxyOMNodeId() { + return currentProxySCMNodeId; + } + + @Override + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId); + createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId); + return currentProxyInfo; + } + + @Override + public void performFailover( + StorageContainerLocationProtocolPB newLeader) { + // Should do nothing here. + LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId()); + } + + public void performFailoverToAssignedLeader(String newLeader) { + if (newLeader == null) { + // If newLeader is not assigned, it will fail over to next proxy. + nextProxyIndex(); + } else { + if (!assignLeaderToNode(newLeader)) { + LOG.debug("Failing over OM proxy to nodeId: {}", newLeader); + nextProxyIndex(); + } + } + } + + @Override + public Class< + StorageContainerLocationProtocolPB> getInterface() { + return StorageContainerLocationProtocolPB.class; + } + + @Override + public synchronized void close() throws IOException { + for (ProxyInfo + proxy : scmProxies.values()) { + StorageContainerLocationProtocolPB scmProxy = + proxy.proxy; + if (scmProxy != null) { + RPC.stopProxy(scmProxy); + } + } + } + + public RetryPolicy.RetryAction getRetryAction(int failovers) { + if (failovers < maxRetryCount) { + return new RetryPolicy.RetryAction( + RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY, + getRetryInterval()); + } else { + return RetryPolicy.RetryAction.FAIL; + } + } + + private synchronized long getRetryInterval() { + // TODO add exponential backup + return retryInterval; + } + + private synchronized int nextProxyIndex() { +// lastAttemptedLeader = currentProxySCMNodeId; + + // round robin the next proxy + currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size(); + currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex); + return currentProxyIndex; + } + + synchronized boolean assignLeaderToNode(String newLeaderNodeId) { + if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { + if (scmProxies.containsKey(newLeaderNodeId)) { +// lastAttemptedLeader = currentProxySCMNodeId; + currentProxySCMNodeId = newLeaderNodeId; + currentProxyIndex = scmNodeIDList.indexOf(currentProxySCMNodeId); + return true; + } + } +// } else { +// lastAttemptedLeader = currentProxySCMNodeId; +// } + return false; + } + + /** + * Creates proxy object if it does not already exist. + */ + private void createSCMProxyIfNeeded(ProxyInfo proxyInfo, + String nodeId) { + if (proxyInfo.proxy == null) { + InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress(); + try { + StorageContainerLocationProtocolPB proxy = + createSCMProxy(address); + try { + proxyInfo.proxy = proxy; + } catch (IllegalAccessError iae) { + scmProxies.put(nodeId, + new ProxyInfo<>(proxy, proxyInfo.proxyInfo)); + } + } catch (IOException ioe) { + LOG.error("{} Failed to create RPC proxy to SCM at {}", + this.getClass().getSimpleName(), address, ioe); + throw new RuntimeException(ioe); + } + } + } + + private StorageContainerLocationProtocolPB createSCMProxy( + InetSocketAddress scmAddress) throws IOException { + Configuration hadoopConf = + LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); + RPC.setProtocolEngine(hadoopConf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + return RPC.getProxy( + StorageContainerLocationProtocolPB.class, + scmVersion, scmAddress, UserGroupInformation.getCurrentUser(), + hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf), + (int)scmClientConfig.getRpcTimeOut()); + } + + public RetryPolicy getSCMContainerLocationRetryPolicy( + String suggestedLeader) { + RetryPolicy retryPolicy = new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retry, + int failover, boolean b) { + performFailoverToAssignedLeader(suggestedLeader); + return getRetryAction(failover); + } + }; + return retryPolicy; + } +} diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 91dbebe33b88..739377551fea 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -100,6 +100,7 @@ message ScmContainerLocationResponse { OK = 1; CONTAINER_ALREADY_EXISTS = 2; CONTAINER_IS_MISSING = 3; + SCM_NOT_LEADER = 4; } } @@ -147,6 +148,7 @@ message ContainerResponseProto { success = 1; errorContainerAlreadyExists = 2; errorContainerMissing = 3; + scmNotLeader = 4; } required Error errorCode = 1; required ContainerWithPipeline containerWithPipeline = 2; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 24f17f124c44..b2f6534372a1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; @@ -120,9 +121,22 @@ public StorageContainerLocationProtocolServerSideTranslatorPB( protocolMetrics, LOG); } + private boolean isLeader() throws ServiceException { + if (!(impl instanceof SCMClientProtocolServer)) { + throw new ServiceException("Should be SCMClientProtocolServer"); + } else { + return ((SCMClientProtocolServer) impl).getScm().checkLeader(); + } + } + @Override public ScmContainerLocationResponse submitRequest(RpcController controller, ScmContainerLocationRequest request) throws ServiceException { + if (!isLeader()) { + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()).setTraceID(request.getTraceID()) + .setSuccess(false).setStatus(Status.SCM_NOT_LEADER).build(); + } return dispatcher .processRequest(request, this::processRequest, request.getCmdType(), request.getTraceID()); diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 038364227f1a..f67addfcc8ae 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -17,14 +17,11 @@ */ package org.apache.hadoop.hdds.scm.cli; -import javax.net.SocketFactory; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -41,20 +38,13 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.tracing.TracingUtil; -import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneSecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT; import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient; @@ -116,25 +106,13 @@ private XceiverClientManager newXCeiverClientManager(ConfigurationSource conf) } public static StorageContainerLocationProtocol newContainerRpcClient( - ConfigurationSource configSource) throws IOException { - - Class protocol = - StorageContainerLocationProtocolPB.class; - Configuration conf = - LegacyHadoopConfigurationSource.asHadoopConfiguration(configSource); - RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class); - long version = RPC.getProtocolVersion(protocol); - InetSocketAddress scmAddress = getScmAddressForClients(configSource); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf); - int rpcTimeOut = Client.getRpcTimeout(conf); - - StorageContainerLocationProtocolPB rpcProxy = - RPC.getProxy(protocol, version, scmAddress, user, conf, - socketFactory, rpcTimeOut); + ConfigurationSource configSource) { + SCMContainerLocationFailoverProxyProvider proxyProvider = + new SCMContainerLocationFailoverProxyProvider(configSource); StorageContainerLocationProtocolClientSideTranslatorPB client = - new StorageContainerLocationProtocolClientSideTranslatorPB(rpcProxy); + new StorageContainerLocationProtocolClientSideTranslatorPB( + proxyProvider); return TracingUtil.createProxy( client, StorageContainerLocationProtocol.class, configSource); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 099b0697de31..340b902ddc1a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -45,16 +45,13 @@ import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.common.Storage.StorageState; @@ -64,7 +61,6 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.recon.ConfigurationProvider; import org.apache.hadoop.ozone.recon.ReconServer; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; @@ -284,18 +280,17 @@ public OzoneClient getRpcClient() throws IOException { */ @Override public StorageContainerLocationProtocolClientSideTranslatorPB - getStorageContainerLocationClient() throws IOException { - long version = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); + getStorageContainerLocationClient() { InetSocketAddress address = scm.getClientRpcAddress(); LOG.info( "Creating StorageContainerLocationProtocol RPC client with address {}", address); + + SCMContainerLocationFailoverProxyProvider proxyProvider = + new SCMContainerLocationFailoverProxyProvider(conf); + return new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); + proxyProvider); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 07424b376312..e7f064e00212 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -72,8 +72,8 @@ import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider; +import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient; @@ -93,12 +93,10 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -848,22 +846,13 @@ private static ScmBlockLocationProtocol getScmBlockClient( * @throws IOException */ private static StorageContainerLocationProtocol getScmContainerClient( - OzoneConfiguration conf) throws IOException { - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - long scmVersion = - RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); - InetSocketAddress scmAddr = getScmAddressForClients( - conf); + OzoneConfiguration conf) { + SCMContainerLocationFailoverProxyProvider proxyProvider = + new SCMContainerLocationFailoverProxyProvider(conf); StorageContainerLocationProtocol scmContainerClient = TracingUtil.createProxy( new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, - scmVersion, - scmAddr, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))), - StorageContainerLocationProtocol.class, conf); + proxyProvider), StorageContainerLocationProtocol.class, conf); return scmContainerClient; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index cb667f43855b..c08ca873323a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -171,11 +171,7 @@ OzoneManagerProtocol getOzoneManagerProtocol( StorageContainerLocationProtocol getSCMProtocol( final OzoneConfiguration configuration) { StorageContainerLocationProtocol storageContainerLocationProtocol = null; - try { - storageContainerLocationProtocol = newContainerRpcClient(configuration); - } catch (IOException e) { - LOG.error("Error in provisioning StorageContainerLocationProtocol ", e); - } + storageContainerLocationProtocol = newContainerRpcClient(configuration); return storageContainerLocationProtocol; } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java index 1cfff127097c..18f75dee3219 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; -import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -34,12 +33,10 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; import org.apache.hadoop.hdds.tracing.TracingUtil; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; @@ -60,7 +57,6 @@ import io.opentracing.util.GlobalTracer; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomStringUtils; -import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; import org.apache.ratis.protocol.ClientId; import org.slf4j.Logger; @@ -337,24 +333,13 @@ public OzoneManagerProtocolClientSideTranslatorPB createOmClient( } public StorageContainerLocationProtocol createStorageContainerLocationClient( - OzoneConfiguration ozoneConf) - throws IOException { - - long version = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); - InetSocketAddress scmAddress = - getScmAddressForClients(ozoneConf); - - RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); + OzoneConfiguration ozoneConf) { + SCMContainerLocationFailoverProxyProvider proxyProvider = + new SCMContainerLocationFailoverProxyProvider(ozoneConf); StorageContainerLocationProtocol client = TracingUtil.createProxy( new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, version, - scmAddress, UserGroupInformation.getCurrentUser(), - ozoneConf, - NetUtils.getDefaultSocketFactory(ozoneConf), - Client.getRpcTimeout(ozoneConf))), + proxyProvider), StorageContainerLocationProtocol.class, ozoneConf); return client; }