Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.ratis;

import org.apache.hadoop.hdds.HddsUtils;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Exception thrown when a server is not a leader for Ratis group.
*/
public class ServerNotLeaderException extends IOException {
private final String currentPeerId;
private final String leader;
private static final Pattern CURRENT_PEER_ID_PATTERN =
Pattern.compile("Server:(.*) is not the leader[.]+.*", Pattern.DOTALL);
private static final Pattern SUGGESTED_LEADER_PATTERN =
Pattern.compile(".*Suggested leader is Server:([^.]*).*", Pattern.DOTALL);

public ServerNotLeaderException(RaftPeerId currentPeerId) {
super("Server:" + currentPeerId + " is not the leader. Could not " +
"determine the leader node.");
this.currentPeerId = currentPeerId.toString();
this.leader = null;
}

public ServerNotLeaderException(RaftPeerId currentPeerId,
String suggestedLeader) {
super("Server:" + currentPeerId + " is not the leader. Suggested leader is"
+ " Server:" + suggestedLeader + ".");
this.currentPeerId = currentPeerId.toString();
this.leader = suggestedLeader;
}

public ServerNotLeaderException(String message) {
super(message);

Matcher currentLeaderMatcher = CURRENT_PEER_ID_PATTERN.matcher(message);
if (currentLeaderMatcher.matches()) {
this.currentPeerId = currentLeaderMatcher.group(1);

Matcher suggestedLeaderMatcher =
SUGGESTED_LEADER_PATTERN.matcher(message);
if (suggestedLeaderMatcher.matches()) {
this.leader = suggestedLeaderMatcher.group(1);
} else {
this.leader = null;
}
} else {
this.currentPeerId = null;
this.leader = null;
}
}

public String getSuggestedLeader() {
return leader;
}

/**
* Convert {@link org.apache.ratis.protocol.exceptions.NotLeaderException}
* to {@link ServerNotLeaderException}.
* @param notLeaderException
* @param currentPeer
* @return ServerNotLeaderException
*/
public static ServerNotLeaderException convertToNotLeaderException(
NotLeaderException notLeaderException,
RaftPeerId currentPeer, String port) {
String suggestedLeader = notLeaderException.getSuggestedLeader() != null ?
HddsUtils
.getHostName(notLeaderException.getSuggestedLeader().getAddress())
.get() :
null;
ServerNotLeaderException serverNotLeaderException;
if (suggestedLeader != null) {
String suggestedLeaderHostPort = suggestedLeader + ":" + port;
serverNotLeaderException =
new ServerNotLeaderException(currentPeer, suggestedLeaderHostPort);
} else {
serverNotLeaderException = new ServerNotLeaderException(currentPeer);
}
return serverNotLeaderException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.ha;

import java.io.IOException;

/**
* exception for which there should be no retry.
*/
public class NonRetriableException extends IOException {

public NonRetriableException(IOException exception) {
super(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.ratis.protocol.exceptions.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -198,12 +201,40 @@ public static Collection<String> getSCMNodeIds(
return getSCMNodeIds(configuration, scmServiceId);
}

private static Throwable unwrapException(Exception e) {
IOException ioException = null;
Throwable cause = e.getCause();
if (cause instanceof RemoteException) {
ioException = ((RemoteException) cause).unwrapRemoteException();
}
return ioException == null ? e : ioException;
}

/**
* Checks if the underlying exception if of type StateMachine. Used by scm
* clients.
*/
public static boolean isNonRetriableException(Exception e) {
Throwable t =
getExceptionForClass(e, StateMachineException.class);
return t == null ? false : true;
}

/**
* Checks if the underlying exception if of type non retriable. Used by scm
* clients.
*/
public static boolean checkNonRetriableException(Exception e) {
Throwable t = unwrapException(e);
return NonRetriableException.class.isInstance(t);
}

// This will return the underlying exception after unwrapping
// the exception to see if it matches with expected exception
// list , returns true otherwise will return false.
public static boolean isRetriableWithNoFailoverException(Exception e) {
Throwable t = e;
while (t != null && t.getCause() != null) {
while (t != null) {
for (Class<? extends Exception> clazz :
getRetriableWithNoFailoverExceptionList()) {
if (clazz.isInstance(t)) {
Expand All @@ -215,20 +246,57 @@ public static boolean isRetriableWithNoFailoverException(Exception e) {
return false;
}

/**
* Checks if the underlying exception if of type retriable with no failover.
* Used by scm clients.
*/
public static boolean checkRetriableWithNoFailoverException(Exception e) {
Throwable t = unwrapException(e);
return RetriableWithNoFailoverException.class.isInstance(t);
}

public static Throwable getNotLeaderException(Exception e) {
return getExceptionForClass(e, NotLeaderException.class);
}

public static Throwable getServerNotLeaderException(Exception e) {
return getExceptionForClass(e, ServerNotLeaderException.class);
}

// This will return the underlying NotLeaderException exception
public static Throwable getExceptionForClass(Exception e,
Class<? extends Exception> clazz) {
IOException ioException = null;
Throwable cause = e.getCause();
if (cause instanceof RemoteException) {
ioException = ((RemoteException) cause).unwrapRemoteException();
}
Throwable t = ioException == null ? e : ioException;
while (t != null) {
if (clazz.isInstance(t)) {
return t;
}
t = t.getCause();
}
return null;
}

public static List<Class<? extends
Exception>> getRetriableWithNoFailoverExceptionList() {
return RETRIABLE_WITH_NO_FAILOVER_EXCEPTION_LIST;
}

public static RetryPolicy.RetryAction getRetryAction(int failovers, int retry,
Exception e, int maxRetryCount, long retryInterval) {
if (SCMHAUtils.isRetriableWithNoFailoverException(e)) {
if (SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
if (retry < maxRetryCount) {
return new RetryPolicy.RetryAction(
RetryPolicy.RetryAction.RetryDecision.RETRY, retryInterval);
} else {
return RetryPolicy.RetryAction.FAIL;
}
} else if (SCMHAUtils.checkNonRetriableException(e)) {
return RetryPolicy.RetryAction.FAIL;
} else {
if (failovers < maxRetryCount) {
return new RetryPolicy.RetryAction(
Expand Down
16 changes: 8 additions & 8 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,14 @@
</description>
</property>

<property>
<name>ozone.client.key.provider.cache.expiry</name>
<tag>OZONE, CLIENT, SECURITY</tag>
<value>10d</value>
<description>Ozone client security key provider cache expiration time.
</description>
</property>

<property>
<name>ozone.scm.info.wait.duration</name>
<tag>OZONE, SCM, OM</tag>
Expand All @@ -2773,12 +2781,4 @@
</description>
</property>

<property>
<name>ozone.client.key.provider.cache.expiry</name>
<tag>OZONE, CLIENT, SECURITY</tag>
<value>10d</value>
<description>Ozone client security key provider cache expiration time.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
Expand Down Expand Up @@ -119,11 +118,6 @@ private SCMBlockLocationResponse submitRequest(
try {
SCMBlockLocationResponse response =
rpcProxy.send(NULL_RPC_CONTROLLER, req);
if (response.getStatus() ==
ScmBlockLocationProtocolProtos.Status.SCM_NOT_LEADER) {
failoverProxyProvider
.performFailoverToAssignedLeader(response.getLeaderSCMNodeId());
}
return response;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -103,7 +103,7 @@ public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
this.retryInterval = config.getRetryInterval();
}

private void loadConfigs() {
private synchronized void loadConfigs() {

scmNodeIds = new ArrayList<>();
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
Expand Down Expand Up @@ -131,7 +131,14 @@ private void loadConfigs() {
}

@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
public synchronized void changeCurrentProxy(String nodeId) {
currentProxyIndex = scmNodeIds.indexOf(nodeId);
currentProxySCMNodeId = nodeId;
nextProxyIndex();
}

@VisibleForTesting
public synchronized String getCurrentProxySCMNodeId() {
return currentProxySCMNodeId;
}

Expand All @@ -143,15 +150,28 @@ public synchronized ProxyInfo getProxy() {
}

@Override
public void performFailover(ScmBlockLocationProtocolPB newLeader) {
public synchronized void performFailover(
ScmBlockLocationProtocolPB newLeader) {
// Should do nothing here.
LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId());
LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
}

public void performFailoverToAssignedLeader(String newLeader) {
public synchronized void performFailoverToAssignedLeader(String newLeader,
Exception e) {
ServerNotLeaderException snle =
(ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
if (snle != null && snle.getSuggestedLeader() != null) {
newLeader = scmProxyInfoMap.values().stream().filter(
proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
.equals(snle.getSuggestedLeader())).findFirst().get().getNodeId();
LOG.debug("Performing failover to suggested leader {}, nodeId {}",
snle.getSuggestedLeader(), newLeader);
}
if (newLeader == null) {
// If newLeader is not assigned, it will fail over to next proxy.
nextProxyIndex();
LOG.debug("Performing failover to next proxy node {}",
currentProxySCMNodeId);
} else {
if (!assignLeaderToNode(newLeader)) {
LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
Expand Down Expand Up @@ -249,8 +269,8 @@ public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
@Override
public RetryAction shouldRetry(Exception e, int retry,
int failover, boolean b) {
if (!SCMHAUtils.isRetriableWithNoFailoverException(e)) {
performFailoverToAssignedLeader(newLeader);
if (!SCMHAUtils.checkRetriableWithNoFailoverException(e)) {
performFailoverToAssignedLeader(newLeader, e);
}
return SCMHAUtils.getRetryAction(failover, retry, e, maxRetryCount,
getRetryInterval());
Expand Down
Loading