Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone;

import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -36,12 +37,15 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.conf.OMClientConfig;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager;

import com.google.common.base.Joiner;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +63,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -575,4 +580,25 @@ public static String getOzoneManagerServiceId(OzoneConfiguration conf)
return serviceId;
}
}

/**
* Unwrap exception to check if it is some kind of access control problem
* ({@link AccessControlException} or {@link SecretManager.InvalidToken}).
*/
public static boolean isAccessControlException(Exception ex) {
if (ex instanceof ServiceException) {
Throwable t = ex.getCause();
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
while (t != null) {
if (t instanceof AccessControlException ||
t instanceof SecretManager.InvalidToken) {
return true;
}
t = t.getCause();
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.ozone.om.ha;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -32,28 +35,31 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;

import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;

/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
Expand All @@ -80,6 +86,7 @@ public class OMFailoverProxyProvider implements

private final String omServiceId;

private List<String> retryExceptions = new ArrayList<>();

// OMFailoverProxyProvider, on encountering certain exception, tries each OM
// once in a round robin fashion. After that it waits for configured time
Expand All @@ -90,6 +97,7 @@ public class OMFailoverProxyProvider implements
private String lastAttemptedOM;
private int numAttemptsOnSameOM = 0;
private final long waitBetweenRetries;
private Set<String> accessControlExceptionOMs = new HashSet<>();

public OMFailoverProxyProvider(ConfigurationSource configuration,
UserGroupInformation ugi, String omServiceId) throws IOException {
Expand All @@ -108,12 +116,7 @@ public OMFailoverProxyProvider(ConfigurationSource configuration,
OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
}

public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi) throws IOException {
this(configuration, ugi, null);
}

private void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
throws IOException {
this.omProxies = new HashMap<>();
this.omProxyInfos = new HashMap<>();
Expand Down Expand Up @@ -203,7 +206,7 @@ public synchronized ProxyInfo getProxy() {
/**
* Creates proxy object if it does not already exist.
*/
private void createOMProxyIfNeeded(ProxyInfo proxyInfo,
protected void createOMProxyIfNeeded(ProxyInfo proxyInfo,
String nodeId) {
if (proxyInfo.proxy == null) {
InetSocketAddress address = omProxyInfos.get(nodeId).getAddress();
Expand All @@ -223,11 +226,90 @@ private void createOMProxyIfNeeded(ProxyInfo proxyInfo,
}
}

@VisibleForTesting
public RetryPolicy getRetryPolicy(int maxFailovers) {
// Client will attempt upto maxFailovers number of failovers between
// available OMs before throwing exception.
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {

if (LOG.isDebugEnabled()) {
if (exception.getCause() != null) {
LOG.debug("RetryProxy: OM {}: {}: {}", getCurrentProxyOMNodeId(),
exception.getCause().getClass().getSimpleName(),
exception.getCause().getMessage());
} else {
LOG.debug("RetryProxy: OM {}: {}", getCurrentProxyOMNodeId(),
exception.getMessage());
}
}
retryExceptions.add(getExceptionMsg(exception, failovers));

if (exception instanceof ServiceException) {
OMNotLeaderException notLeaderException =
getNotLeaderException(exception);
if (notLeaderException != null) {
// TODO: NotLeaderException should include the host
// address of the suggested leader along with the nodeID.
// Failing over just based on nodeID is not very robust.

// OMFailoverProxyProvider#performFailover() is a dummy call and
// does not perform any failover. Failover manually to the next OM.
performFailoverToNextProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}

OMLeaderNotReadyException leaderNotReadyException =
getLeaderNotReadyException(exception);
if (leaderNotReadyException != null) {
// Retry on same OM again as leader OM is not ready.
// Failing over to same OM so that wait time between retries is
// incremented
performFailoverIfRequired(getCurrentProxyOMNodeId());
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}
}

if (!shouldFailover(exception)) {
return RetryAction.FAIL; // do not retry
}

// For all other exceptions, fail over manually to the next OM Node
// proxy.
performFailoverToNextProxy();
return getRetryAction(RetryDecision.FAILOVER_AND_RETRY, failovers);
}

private RetryAction getRetryAction(RetryDecision fallbackAction,
int failovers) {
if (failovers < maxFailovers) {
return new RetryAction(fallbackAction, getWaitTime());
} else {
StringBuilder allRetryExceptions = new StringBuilder();
allRetryExceptions.append("\n");
retryExceptions.stream().forEach(e -> allRetryExceptions.append(e)
.append("\n"));
LOG.error("Failed to connect to OMs: {}. Attempted {} failovers. " +
"Got following exceptions during retries: {}",
getOMProxyInfos(), maxFailovers,
allRetryExceptions.toString());
retryExceptions.clear();
return RetryAction.FAIL;
}
}
};

return retryPolicy;
}

public Text getCurrentProxyDelegationToken() {
return delegationTokenService;
}

private Text computeDelegationTokenService() {
protected Text computeDelegationTokenService() {
// For HA, this will return "," separated address of all OM's.
List<String> addresses = new ArrayList<>();

Expand Down Expand Up @@ -351,7 +433,7 @@ private synchronized int getCurrentProxyIndex() {

public synchronized long getWaitTime() {
if (currentProxyOMNodeId.equals(lastAttemptedOM)) {
// Clear attemptedOMs list as round robin has been broken. Add only the
// Clear attemptedOMs list as round robin has been broken.
attemptedOMs.clear();

// The same OM will be contacted again. So wait and then retry.
Expand All @@ -375,6 +457,23 @@ public synchronized long getWaitTime() {
return waitBetweenRetries;
}

public synchronized boolean shouldFailover(Exception ex) {
if (OmUtils.isAccessControlException(ex)) {
// Retry all available OMs once before failing with
// AccessControlException.
if (accessControlExceptionOMs.contains(currentProxyOMNodeId)) {
accessControlExceptionOMs.clear();
return false;
} else {
accessControlExceptionOMs.add(currentProxyOMNodeId);
if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
return false;
}
}
}
return true;
}

/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
Expand All @@ -398,5 +497,69 @@ public List<ProxyInfo> getOMProxies() {
public List<OMProxyInfo> getOMProxyInfos() {
return new ArrayList<OMProxyInfo>(omProxyInfos.values());
}

private static String getExceptionMsg(Exception e, int retryAttempt) {
StringBuilder exceptionMsg = new StringBuilder()
.append("Retry Attempt ")
.append(retryAttempt)
.append(" Exception - ");
if (e.getCause() == null) {
exceptionMsg.append(e.getClass().getCanonicalName())
.append(": ")
.append(e.getMessage());
} else {
exceptionMsg.append(e.getCause().getClass().getCanonicalName())
.append(": ")
.append(e.getCause().getMessage());
}
return exceptionMsg.toString();
}

/**
* Check if exception is OMLeaderNotReadyException.
*
* @param exception
* @return OMLeaderNotReadyException
*/
private static OMLeaderNotReadyException getLeaderNotReadyException(
Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof OMLeaderNotReadyException) {
return (OMLeaderNotReadyException) ioException;
}
}
return null;
}

/**
* Check if exception is a OMNotLeaderException.
*
* @return OMNotLeaderException.
*/
public static OMNotLeaderException getNotLeaderException(
Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof OMNotLeaderException) {
return (OMNotLeaderException) ioException;
}
}
return null;
}

@VisibleForTesting
protected void setProxiesForTesting(
Map<String, ProxyInfo<OzoneManagerProtocolPB>> testOMProxies,
Map<String, OMProxyInfo> testOMProxyInfos,
List<String> testOMNodeIDList) {
this.omProxies = testOMProxies;
this.omProxyInfos = testOMProxyInfos;
this.omNodeIDList = testOMNodeIDList;
}
}

Loading