Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -170,6 +171,8 @@ private class RatisServerStub implements SCMRatisServer {
private Map<RequestType, Object> handlers =
new EnumMap<>(RequestType.class);

private RaftPeerId leaderId = RaftPeerId.valueOf(UUID.randomUUID().toString());

@Override
public void start() {
}
Expand Down Expand Up @@ -283,5 +286,10 @@ public boolean removeSCM(RemoveSCMRequest request) throws IOException {
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}

@Override
public RaftPeerId getLeaderId() {
return leaderId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;

Expand Down Expand Up @@ -68,4 +69,6 @@ SCMRatisResponse submitRequest(SCMRatisRequest request)

GrpcTlsConfig getGrpcTlsConfig();

RaftPeerId getLeaderId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import jakarta.annotation.Nullable;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -147,6 +148,16 @@ public GrpcTlsConfig getGrpcTlsConfig() {
return grpcTlsConfig;
}

@Override
@Nullable
public RaftPeerId getLeaderId() {
RaftPeer raftLeaderPeer = getLeader();
if (raftLeaderPeer != null) {
return raftLeaderPeer.getId();
}
return null;
}

private static void waitForLeaderToBeReady(RaftServer server,
OzoneConfiguration conf, RaftGroup group) throws IOException {
boolean ready;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,8 +1595,9 @@ public void start() throws IOException {

setStartTime();

// At this point leader is not known
scmHAMetricsUpdate(null);
RaftPeerId leaderId = SCMHAUtils.isSCMHAEnabled(configuration)
? getScmHAManager().getRatisServer().getLeaderId() : null;
scmHAMetricsUpdate(Objects.toString(leaderId, null));

if (scmCertificateClient != null) {
// In case root CA certificate is rotated during this SCM is offline
Expand Down Expand Up @@ -2298,7 +2299,6 @@ public void scmHAMetricsUpdate(String leaderId) {
// unregister, in case metrics already exist
// so that the metric tags will get updated.
SCMHAMetrics.unRegister();

scmHAMetrics = SCMHAMetrics.create(getScmId(), leaderId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -31,6 +32,7 @@
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -111,6 +113,11 @@ public SCMStateMachine getSCMStateMachine() {
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}

@Override
public RaftPeerId getLeaderId() {
return RaftPeerId.valueOf(UUID.randomUUID().toString());
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;

import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* Test for SCM Ratis Server Implementation.
*/
public class TestSCMRatisServerImpl {

@Test
public void testGetLeaderId() throws Exception {

try (
MockedConstruction<SecurityConfig> mockedSecurityConfigConstruction = mockConstruction(SecurityConfig.class);
MockedStatic<RaftServer> staticMockedRaftServer = mockStatic(RaftServer.class);
MockedStatic<RatisUtil> staticMockedRatisUtil = mockStatic(RatisUtil.class);
) {
// given
ConfigurationSource conf = mock(ConfigurationSource.class);
StorageContainerManager scm = mock(StorageContainerManager.class);
String clusterId = "CID-" + UUID.randomUUID();
when(scm.getClusterId()).thenReturn(clusterId);
SCMHADBTransactionBuffer dbTransactionBuffer = mock(SCMHADBTransactionBuffer.class);

RaftServer.Builder raftServerBuilder = mock(RaftServer.Builder.class);
when(raftServerBuilder.setServerId(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setProperties(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setStateMachineRegistry(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setOption(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setGroup(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setParameters(any())).thenReturn(raftServerBuilder);

RaftServer raftServer = mock(RaftServer.class);

RaftServer.Division division = mock(RaftServer.Division.class);
when(raftServer.getDivision(any())).thenReturn(division);

SCMStateMachine scmStateMachine = mock(SCMStateMachine.class);
when(division.getStateMachine()).thenReturn(scmStateMachine);

when(raftServerBuilder.build()).thenReturn(raftServer);

staticMockedRaftServer.when(RaftServer::newBuilder).thenReturn(raftServerBuilder);

RaftProperties raftProperties = mock(RaftProperties.class);
staticMockedRatisUtil.when(() -> RatisUtil.newRaftProperties(conf)).thenReturn(raftProperties);

SecurityConfig sc = new SecurityConfig(conf);
when(sc.isSecurityEnabled()).thenReturn(false);

SCMRatisServerImpl scmRatisServer = spy(new SCMRatisServerImpl(conf, scm, dbTransactionBuffer));
doReturn(RaftPeer.newBuilder().setId(RaftPeerId.valueOf("peer1")).build()).when(scmRatisServer).getLeader();

// when
RaftPeerId leaderId = scmRatisServer.getLeaderId();

// then
assertEquals(RaftPeerId.valueOf("peer1"), leaderId);

// but when
doReturn(null).when(scmRatisServer).getLeader();
leaderId = scmRatisServer.getLeaderId();

// then
assertNull(leaderId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.ha.SCMHAMetrics;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
Expand All @@ -43,9 +44,10 @@
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -54,7 +56,9 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
Expand All @@ -72,6 +76,8 @@
@Timeout(300)
public class TestStorageContainerManagerHA {

private static final Logger LOG = LoggerFactory.getLogger(TestStorageContainerManagerHA.class);

private MiniOzoneHAClusterImpl cluster = null;
private OzoneConfiguration conf;
private String omServiceId;
Expand All @@ -86,7 +92,6 @@ public class TestStorageContainerManagerHA {
*
* @throws IOException
*/
@BeforeEach
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
Expand Down Expand Up @@ -118,6 +123,7 @@ public void shutdown() {

@Test
void testAllSCMAreRunning() throws Exception {
init();
int count = 0;
List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
assertEquals(numOfSCMs, scms.size());
Expand All @@ -129,6 +135,9 @@ void testAllSCMAreRunning() throws Exception {
count++;
leaderScm = scm;
}
if (SCMHAUtils.isSCMHAEnabled(conf)) {
assertNotNull(scm.getScmHAManager().getRatisServer().getLeaderId());
}
assertEquals(peerSize, numOfSCMs);
}
assertEquals(1, count);
Expand Down Expand Up @@ -246,6 +255,7 @@ private boolean areAllScmInSync(long leaderIndex) {

@Test
public void testPrimordialSCM() throws Exception {
init();
StorageContainerManager scm1 = cluster.getStorageContainerManagers().get(0);
StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
OzoneConfiguration conf1 = scm1.getConfiguration();
Expand All @@ -264,6 +274,7 @@ public void testPrimordialSCM() throws Exception {

@Test
public void testBootStrapSCM() throws Exception {
init();
StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
OzoneConfiguration conf2 = scm2.getConfiguration();
boolean isDeleted = scm2.getScmStorageConfig().getVersionFile().delete();
Expand Down Expand Up @@ -323,4 +334,72 @@ private void waitForLeaderToBeReady()
}, 1000, (int) ScmConfigKeys
.OZONE_SCM_HA_RATIS_LEADER_READY_WAIT_TIMEOUT_DEFAULT);
}

@Test
public void testSCMLeadershipMetric() throws IOException, InterruptedException {
// GIVEN
int scmInstancesCount = 3;
conf = new OzoneConfiguration();
MiniOzoneHAClusterImpl.Builder haMiniClusterBuilder = MiniOzoneCluster.newHABuilder(conf)
.setSCMServiceId("scm-service-id")
.setOMServiceId("om-service-id")
.setNumOfActiveOMs(0)
.setNumOfStorageContainerManagers(scmInstancesCount)
.setNumOfActiveSCMs(1);
haMiniClusterBuilder.setNumDatanodes(0);

// start single SCM instance without other Ozone services
// in order to initialize and bootstrap SCM instances only
cluster = haMiniClusterBuilder.build();

List<StorageContainerManager> storageContainerManagersList = cluster.getStorageContainerManagersList();

// stop the single SCM instance in order to imitate further simultaneous start of SCMs
storageContainerManagersList.get(0).stop();
storageContainerManagersList.get(0).join();

// WHEN (imitate simultaneous start of the SCMs)
int retryCount = 0;
while (true) {
CountDownLatch scmInstancesCounter = new CountDownLatch(scmInstancesCount);
AtomicInteger failedSCMs = new AtomicInteger();
for (StorageContainerManager scm : storageContainerManagersList) {
new Thread(() -> {
try {
scm.start();
} catch (IOException e) {
failedSCMs.incrementAndGet();
} finally {
scmInstancesCounter.countDown();
}
}).start();
}
scmInstancesCounter.await();
if (failedSCMs.get() == 0) {
break;
} else {
for (StorageContainerManager scm : storageContainerManagersList) {
scm.stop();
scm.join();
LOG.info("Stopping StorageContainerManager server at {}",
scm.getClientRpcAddress());
}
++retryCount;
LOG.info("SCMs port conflicts, retried {} times",
retryCount);
failedSCMs.set(0);
}
}

// THEN expect only one SCM node (leader) will have 'scmha_metrics_scmha_leader_state' metric set to 1
int leaderCount = 0;
for (StorageContainerManager scm : storageContainerManagersList) {
if (scm.getScmHAMetrics() != null && scm.getScmHAMetrics().getSCMHAMetricsInfoLeaderState() == 1) {
leaderCount++;
break;
}
}
assertEquals(1, leaderCount);
}

}