Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,16 @@
</description>
</property>

<property>
<name>ozone.om.ratis.server.role.check.interval</name>
<value>15s</value>
<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
<description>The interval between OM leader performing a role
check on its ratis server. Ratis server informs OM if it
loses the leader role. The scheduled check is an secondary
check to ensure that the leader role is updated periodically
.</description>
</property>

<property>
<name>ozone.acl.authorizer.class</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ private OMConfigKeys() {
OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
= TimeDuration.valueOf(120, TimeUnit.SECONDS);

// OM Leader server role check interval
public static final String OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY
= "ozone.om.ratis.server.role.check.interval";
public static final TimeDuration
OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
= TimeDuration.valueOf(15, TimeUnit.SECONDS);

public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
+ "kerberos.keytab.file";
public static final String OZONE_OM_KERBEROS_PRINCIPAL_KEY = "ozone.om"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.exceptions;

import java.io.IOException;

/**
* Exception thrown by
* {@link org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB} when
* a read request is received by a non leader OM node.
*/
public class NotLeaderException extends IOException {

private final String currentPeerId;
private final String leaderPeerId;

public NotLeaderException(String currentPeerIdStr) {
super("OM " + currentPeerIdStr + " is not the leader. Could not " +
"determine the leader node.");
this.currentPeerId = currentPeerIdStr;
this.leaderPeerId = null;
}

public NotLeaderException(String currentPeerIdStr,
String suggestedLeaderPeerIdStr) {
super("OM " + currentPeerIdStr + " is not the leader. Suggested leader is "
+ suggestedLeaderPeerIdStr);
this.currentPeerId = currentPeerIdStr;
this.leaderPeerId = suggestedLeaderPeerIdStr;
}

public String getSuggestedLeaderNodeId() {
return leaderPeerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,14 @@ public Class<OzoneManagerProtocolPB> getInterface() {
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
" node");
performFailover(null);
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* the License.
*/

package org.apache.hadoop.ozone.om.ratis;
package org.apache.hadoop.ozone.om.helpers;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -25,8 +25,6 @@
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
Expand Down Expand Up @@ -54,14 +52,15 @@ private OMRatisHelper() {

/**
* Creates a new RaftClient object.
* @param rpcType Replication Type
* @param omId OM id of the client
* @param group RaftGroup
*
* @param rpcType Replication Type
* @param omId OM id of the client
* @param group RaftGroup
* @param retryPolicy Retry policy
* @return RaftClient object
*/
static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
group, RetryPolicy retryPolicy, Configuration conf) {
public static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
group, RetryPolicy retryPolicy, Configuration conf) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
Expand All @@ -85,36 +84,27 @@ static RaftPeerId getRaftPeerId(String omId) {
return RaftPeerId.valueOf(omId);
}

static ByteString convertRequestToByteString(OMRequest request) {
public static ByteString convertRequestToByteString(OMRequest request) {
byte[] requestBytes = request.toByteArray();
return ByteString.copyFrom(requestBytes);
}

static OMRequest convertByteStringToOMRequest(ByteString byteString)
public static OMRequest convertByteStringToOMRequest(ByteString byteString)
throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray();
return OMRequest.parseFrom(bytes);
}

static Message convertResponseToMessage(OMResponse response) {
public static Message convertResponseToMessage(OMResponse response) {
byte[] requestBytes = response.toByteArray();
return Message.valueOf(ByteString.copyFrom(requestBytes));
}

static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
throws InvalidProtocolBufferException {
public static OMResponse getOMResponseFromRaftClientReply(
RaftClientReply reply) throws InvalidProtocolBufferException {
byte[] bytes = reply.getMessage().getContent().toByteArray();
return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
.setLeaderOMNodeId(reply.getReplierId())
.build();
}

static OMResponse getErrorResponse(Type cmdType, Exception e) {
return OMResponse.newBuilder()
.setCmdType(cmdType)
.setSuccess(false)
.setMessage(e.getMessage())
.setStatus(Status.INTERNAL_ERROR)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
Expand Down Expand Up @@ -195,29 +196,49 @@ public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
private OzoneManagerProtocolPB createRetryProxy(
OMFailoverProxyProvider failoverProxyProvider,
int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {

RetryPolicy retryPolicyOnNetworkException = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailovers, maxRetries, delayMillis, maxDelayBase);

RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
if (exception instanceof EOFException ||
exception instanceof ServiceException) {
if (retries < maxRetries && failovers < maxFailovers) {
return RetryAction.FAILOVER_AND_RETRY;

if (exception instanceof ServiceException) {
Throwable cause = exception.getCause();
if (cause instanceof NotLeaderException) {
NotLeaderException notLeaderException = (NotLeaderException) cause;
omFailoverProxyProvider.performFailoverIfRequired(
notLeaderException.getSuggestedLeaderNodeId());
return getRetryAction(RetryAction.RETRY, retries, failovers);
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
failovers);
}
} else if (exception instanceof EOFException) {
return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
failovers);
} else {
return retryPolicyOnNetworkException.shouldRetry(
exception, retries, failovers, isIdempotentOrAtMostOnce);
exception, retries, failovers, isIdempotentOrAtMostOnce);
}
}

private RetryAction getRetryAction(RetryAction fallbackAction,
int retries, int failovers) {
if (retries < maxRetries && failovers < maxFailovers) {
return fallbackAction;
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
}
}
};

OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
return proxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
Expand All @@ -75,7 +73,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();

@Rule
public Timeout timeout = new Timeout(120_000);
public Timeout timeout = new Timeout(300_000);

/**
* Create a MiniDFSCluster for testing.
Expand All @@ -93,7 +91,6 @@ public void init() throws Exception {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);

cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
Expand Down Expand Up @@ -313,4 +310,41 @@ public void testOMRetryProxy() throws Exception {
"3 retries and 3 failovers"));
}
}

@Test
public void testReadRequest() throws Exception {
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
objectStore.createVolume(volumeName);

OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
String currentLeaderNodeId = omFailoverProxyProvider
.getCurrentProxyOMNodeId();

// A read request from any proxy should failover to the current leader OM
for (int i = 0; i < numOfOMs; i++) {
// Failover OMFailoverProxyProvider to OM at index i
OzoneManager ozoneManager = cluster.getOzoneManager(i);
String omHostName = ozoneManager.getOmRpcServerAddr().getHostName();
int rpcPort = ozoneManager.getOmRpcServerAddr().getPort();

// Get the ObjectStore and FailoverProxyProvider for OM at index i
final ObjectStore store = OzoneClientFactory.getRpcClient(
omHostName, rpcPort, conf).getObjectStore();
final OMFailoverProxyProvider proxyProvider =
store.getClientProxy().getOMProxyProvider();

// Failover to the OM node that the objectStore points to
omFailoverProxyProvider.performFailoverIfRequired(
ozoneManager.getOMNodId());

// A read request should result in the proxyProvider failing over to
// leader node.
OzoneVolume volume = store.getVolume(volumeName);
Assert.assertEquals(volumeName, volume.getName());

Assert.assertEquals(currentLeaderNodeId,
proxyProvider.getCurrentProxyOMNodeId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1236,8 +1236,8 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
ProtobufRpcEngine.class);

BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
isRatisEnabled));
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer,
omRatisClient, isRatisEnabled));
return startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
Expand Down
Loading