Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.om;

import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT;

/**
* Test Ozone Manager HA Metrics.
*/
public class TestOzoneManagerHAMetrics extends TestOzoneManagerHA {

@Test
public void testOMHAMetrics() throws Exception {
waitForLeaderToBeReady();

// Get leader OM
OzoneManager leaderOM = getCluster().getOMLeader();
// Store current leader's node ID,
// to use it after restarting the OM
String leaderOMId = leaderOM.getOMNodeId();
// Get a list of all OMs
List<OzoneManager> omList = getCluster().getOzoneManagersList();
// Check metrics for all OMs
checkOMHAMetricsForAllOMs(omList, leaderOMId);

// Restart leader OM
getCluster().shutdownOzoneManager(leaderOM);
getCluster().restartOzoneManager(leaderOM, true);
waitForLeaderToBeReady();

// Get the new leader
OzoneManager newLeaderOM = getCluster().getOMLeader();
String newLeaderOMId = newLeaderOM.getOMNodeId();
// Get a list of all OMs again
omList = getCluster().getOzoneManagersList();
// New state for the old leader
int newState = leaderOMId.equals(newLeaderOMId) ? 1 : 0;

// Get old leader
OzoneManager oldLeader = getCluster().getOzoneManager(leaderOMId);
// Get old leader's metrics
OMHAMetrics omhaMetrics = oldLeader.getOmhaMetrics();

Assertions.assertEquals(newState,
omhaMetrics.getOmhaInfoOzoneManagerHALeaderState());

// Check that metrics for all OMs have been updated
checkOMHAMetricsForAllOMs(omList, newLeaderOMId);
}

private void checkOMHAMetricsForAllOMs(List<OzoneManager> omList,
String leaderOMId) {
for (OzoneManager om : omList) {
// Get OMHAMetrics for the current OM
OMHAMetrics omhaMetrics = om.getOmhaMetrics();
String nodeId = om.getOMNodeId();

// If current OM is leader, state should be 1
int expectedState = nodeId
.equals(leaderOMId) ? 1 : 0;
Assertions.assertEquals(expectedState,
omhaMetrics.getOmhaInfoOzoneManagerHALeaderState());

Assertions.assertEquals(nodeId, omhaMetrics.getOmhaInfoNodeId());
}
}


/**
* After restarting OMs we need to wait
* for a leader to be elected and ready.
*/
private void waitForLeaderToBeReady()
throws InterruptedException, TimeoutException {
// Wait for Leader Election timeout
int timeout = OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.toIntExact(TimeUnit.MILLISECONDS);
GenericTestUtils.waitFor(() ->
getCluster().getOMLeader() != null, 500, timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -42,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT;
Expand Down Expand Up @@ -106,85 +104,6 @@ public void testMultipartUpload() throws Exception {
testMultipartUploadWithOneOmNodeDown();
}

@Test
@Flaky("HDDS-8035")
public void testOMHAMetrics() throws InterruptedException,
TimeoutException, IOException {
waitForLeaderToBeReady();

// Get leader OM
OzoneManager leaderOM = getCluster().getOMLeader();
// Store current leader's node ID,
// to use it after restarting the OM
String leaderOMId = leaderOM.getOMNodeId();
// Get a list of all OMs
List<OzoneManager> omList = getCluster().getOzoneManagersList();

// Check metrics for all OMs
checkOMHAMetricsForAllOMs(omList, leaderOMId);

// Restart current leader OM
leaderOM.stop();
leaderOM.restart();

waitForLeaderToBeReady();

// Get the new leader
OzoneManager newLeaderOM = getCluster().getOMLeader();
String newLeaderOMId = newLeaderOM.getOMNodeId();
// Get a list of all OMs again
omList = getCluster().getOzoneManagersList();

// New state for the old leader
int newState = leaderOMId.equals(newLeaderOMId) ? 1 : 0;

// Get old leader
OzoneManager oldLeader = getCluster().getOzoneManager(leaderOMId);
// Get old leader's metrics
OMHAMetrics omhaMetrics = oldLeader.getOmhaMetrics();

Assertions.assertEquals(newState,
omhaMetrics.getOmhaInfoOzoneManagerHALeaderState());

// Check that metrics for all OMs have been updated
checkOMHAMetricsForAllOMs(omList, newLeaderOMId);
}

private void checkOMHAMetricsForAllOMs(List<OzoneManager> omList,
String leaderOMId) {
for (OzoneManager om : omList) {
// Get OMHAMetrics for the current OM
OMHAMetrics omhaMetrics = om.getOmhaMetrics();
String nodeId = om.getOMNodeId();

// If current OM is leader, state should be 1
int expectedState = nodeId
.equals(leaderOMId) ? 1 : 0;

Assertions.assertEquals(expectedState,
omhaMetrics.getOmhaInfoOzoneManagerHALeaderState());

Assertions.assertEquals(nodeId, omhaMetrics.getOmhaInfoNodeId());
}
}


/**
* Some tests are stopping or restarting OMs.
* There are test cases where we might need to
* wait for a leader to be elected and ready.
*/
private void waitForLeaderToBeReady()
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> {
try {
return getCluster().getOMLeader().isLeaderReady();
} catch (Exception e) {
return false;
}
}, 1000, 80000);
}

@Test
public void testFileOperationsAndDelete() throws Exception {
testFileOperationsWithRecursive();
Expand Down Expand Up @@ -528,7 +447,7 @@ public void testOMRestart() throws Exception {
followerOM1.stop();

// Do more transactions. Stopped OM should miss these transactions and
// the logs corresponding to atleast some of the missed transactions
// the logs corresponding to at least some missed transactions
// should be purged. This will force the OM to install snapshot when
// restarted.
long minNewTxIndex = followerOM1LastAppliedIndex + getLogPurgeGap() * 10L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1909,17 +1909,6 @@ public void updatePeerList(List<String> newPeers) {
}
}
}
RaftPeer leader = omRatisServer.getLeader();
if (Objects.nonNull(leader)) {
// If we have any leader information, its id cannot be null.
String leaderId = leader.getId().toString();
omHAMetricsInit(leaderId);
} else {
LOG.error("OzoneManagerRatisServer leader is null, " +
"unregistering OMHAMetrics.");
// Unregister, to get rid of stale metrics
OMHAMetrics.unRegister();
}
}

/**
Expand Down Expand Up @@ -2218,6 +2207,10 @@ public void stop() {
if (certClient != null) {
certClient.close();
}

if (omhaMetrics != null) {
OMHAMetrics.unRegister();
}
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
}
Expand Down Expand Up @@ -2941,9 +2934,9 @@ public String getRatisRoles() {
/**
* Create OMHAMetrics instance.
*/
private void omHAMetricsInit(String leaderId) {
public void omHAMetricsInit(String leaderId) {
// unregister, in case metrics already exist
// so that the metric tags will get updated.
// so that the metrics will get updated.
OMHAMetrics.unRegister();
omhaMetrics = OMHAMetrics
.create(getOMNodeId(), leaderId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -156,6 +157,13 @@ public SnapshotInfo getLatestSnapshot() {
return snapshotInfo;
}

@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId newLeaderId) {
// Initialize OMHAMetrics
ozoneManager.omHAMetricsInit(newLeaderId.toString());
}

/**
* Called to notify state machine about indexes which are processed
* internally by Raft Server, this currently happens when conf entries are
Expand Down