diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java index b9539684ed0c..85664dd232fa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java @@ -23,6 +23,7 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.UUID; import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; @@ -170,6 +171,8 @@ private class RatisServerStub implements SCMRatisServer { private Map handlers = new EnumMap<>(RequestType.class); + private RaftPeerId leaderId = RaftPeerId.valueOf(UUID.randomUUID().toString()); + @Override public void start() { } @@ -283,5 +286,10 @@ public boolean removeSCM(RemoveSCMRequest request) throws IOException { public GrpcTlsConfig getGrpcTlsConfig() { return null; } + + @Override + public RaftPeerId getLeaderId() { + return leaderId; + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java index a786bd2944f5..4e883b27a7dd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.AddSCMRequest; import org.apache.hadoop.hdds.scm.RemoveSCMRequest; import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.server.RaftServer; @@ -68,4 +69,6 @@ SCMRatisResponse submitRequest(SCMRatisRequest request) GrpcTlsConfig getGrpcTlsConfig(); + RaftPeerId getLeaderId(); + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java index 70dffba27ec0..0383bf180959 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import jakarta.annotation.Nullable; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -147,6 +148,16 @@ public GrpcTlsConfig getGrpcTlsConfig() { return grpcTlsConfig; } + @Override + @Nullable + public RaftPeerId getLeaderId() { + RaftPeer raftLeaderPeer = getLeader(); + if (raftLeaderPeer != null) { + return raftLeaderPeer.getId(); + } + return null; + } + private static void waitForLeaderToBeReady(RaftServer server, OzoneConfiguration conf, RaftGroup group) throws IOException { boolean ready; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 9390318a29ff..5895ecc12b39 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1595,8 +1595,9 @@ public void start() throws IOException { setStartTime(); - // At this point leader is not known - scmHAMetricsUpdate(null); + RaftPeerId leaderId = SCMHAUtils.isSCMHAEnabled(configuration) + ? getScmHAManager().getRatisServer().getLeaderId() : null; + scmHAMetricsUpdate(Objects.toString(leaderId, null)); if (scmCertificateClient != null) { // In case root CA certificate is rotated during this SCM is offline @@ -2298,7 +2299,6 @@ public void scmHAMetricsUpdate(String leaderId) { // unregister, in case metrics already exist // so that the metric tags will get updated. SCMHAMetrics.unRegister(); - scmHAMetrics = SCMHAMetrics.create(getScmId(), leaderId); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java index a5a2054a8ae7..049f38480d81 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.RemoveSCMRequest; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.server.RaftServer; import org.junit.jupiter.api.BeforeEach; @@ -31,6 +32,7 @@ import java.io.IOException; import java.lang.reflect.Proxy; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; @@ -111,6 +113,11 @@ public SCMStateMachine getSCMStateMachine() { public GrpcTlsConfig getGrpcTlsConfig() { return null; } + + @Override + public RaftPeerId getLeaderId() { + return RaftPeerId.valueOf(UUID.randomUUID().toString()); + } }; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisServerImpl.java new file mode 100644 index 000000000000..6919ce41ed1c --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisServerImpl.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.SecurityConfig; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; + +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Test for SCM Ratis Server Implementation. + */ +public class TestSCMRatisServerImpl { + + @Test + public void testGetLeaderId() throws Exception { + + try ( + MockedConstruction mockedSecurityConfigConstruction = mockConstruction(SecurityConfig.class); + MockedStatic staticMockedRaftServer = mockStatic(RaftServer.class); + MockedStatic staticMockedRatisUtil = mockStatic(RatisUtil.class); + ) { + // given + ConfigurationSource conf = mock(ConfigurationSource.class); + StorageContainerManager scm = mock(StorageContainerManager.class); + String clusterId = "CID-" + UUID.randomUUID(); + when(scm.getClusterId()).thenReturn(clusterId); + SCMHADBTransactionBuffer dbTransactionBuffer = mock(SCMHADBTransactionBuffer.class); + + RaftServer.Builder raftServerBuilder = mock(RaftServer.Builder.class); + when(raftServerBuilder.setServerId(any())).thenReturn(raftServerBuilder); + when(raftServerBuilder.setProperties(any())).thenReturn(raftServerBuilder); + when(raftServerBuilder.setStateMachineRegistry(any())).thenReturn(raftServerBuilder); + when(raftServerBuilder.setOption(any())).thenReturn(raftServerBuilder); + when(raftServerBuilder.setGroup(any())).thenReturn(raftServerBuilder); + when(raftServerBuilder.setParameters(any())).thenReturn(raftServerBuilder); + + RaftServer raftServer = mock(RaftServer.class); + + RaftServer.Division division = mock(RaftServer.Division.class); + when(raftServer.getDivision(any())).thenReturn(division); + + SCMStateMachine scmStateMachine = mock(SCMStateMachine.class); + when(division.getStateMachine()).thenReturn(scmStateMachine); + + when(raftServerBuilder.build()).thenReturn(raftServer); + + staticMockedRaftServer.when(RaftServer::newBuilder).thenReturn(raftServerBuilder); + + RaftProperties raftProperties = mock(RaftProperties.class); + staticMockedRatisUtil.when(() -> RatisUtil.newRaftProperties(conf)).thenReturn(raftProperties); + + SecurityConfig sc = new SecurityConfig(conf); + when(sc.isSecurityEnabled()).thenReturn(false); + + SCMRatisServerImpl scmRatisServer = spy(new SCMRatisServerImpl(conf, scm, dbTransactionBuffer)); + doReturn(RaftPeer.newBuilder().setId(RaftPeerId.valueOf("peer1")).build()).when(scmRatisServer).getLeader(); + + // when + RaftPeerId leaderId = scmRatisServer.getLeaderId(); + + // then + assertEquals(RaftPeerId.valueOf("peer1"), leaderId); + + // but when + doReturn(null).when(scmRatisServer).getLeader(); + leaderId = scmRatisServer.getLeaderId(); + + // then + assertNull(leaderId); + } + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHA.java index 2986484d2ad0..2f9c8c938a31 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHA.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.ha.SCMHAMetrics; +import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -43,9 +44,10 @@ import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.statemachine.SnapshotInfo; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Instant; @@ -54,7 +56,9 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; @@ -72,6 +76,8 @@ @Timeout(300) public class TestStorageContainerManagerHA { + private static final Logger LOG = LoggerFactory.getLogger(TestStorageContainerManagerHA.class); + private MiniOzoneHAClusterImpl cluster = null; private OzoneConfiguration conf; private String omServiceId; @@ -86,7 +92,6 @@ public class TestStorageContainerManagerHA { * * @throws IOException */ - @BeforeEach public void init() throws Exception { conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s"); @@ -118,6 +123,7 @@ public void shutdown() { @Test void testAllSCMAreRunning() throws Exception { + init(); int count = 0; List scms = cluster.getStorageContainerManagers(); assertEquals(numOfSCMs, scms.size()); @@ -129,6 +135,9 @@ void testAllSCMAreRunning() throws Exception { count++; leaderScm = scm; } + if (SCMHAUtils.isSCMHAEnabled(conf)) { + assertNotNull(scm.getScmHAManager().getRatisServer().getLeaderId()); + } assertEquals(peerSize, numOfSCMs); } assertEquals(1, count); @@ -246,6 +255,7 @@ private boolean areAllScmInSync(long leaderIndex) { @Test public void testPrimordialSCM() throws Exception { + init(); StorageContainerManager scm1 = cluster.getStorageContainerManagers().get(0); StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1); OzoneConfiguration conf1 = scm1.getConfiguration(); @@ -264,6 +274,7 @@ public void testPrimordialSCM() throws Exception { @Test public void testBootStrapSCM() throws Exception { + init(); StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1); OzoneConfiguration conf2 = scm2.getConfiguration(); boolean isDeleted = scm2.getScmStorageConfig().getVersionFile().delete(); @@ -323,4 +334,72 @@ private void waitForLeaderToBeReady() }, 1000, (int) ScmConfigKeys .OZONE_SCM_HA_RATIS_LEADER_READY_WAIT_TIMEOUT_DEFAULT); } + + @Test + public void testSCMLeadershipMetric() throws IOException, InterruptedException { + // GIVEN + int scmInstancesCount = 3; + conf = new OzoneConfiguration(); + MiniOzoneHAClusterImpl.Builder haMiniClusterBuilder = MiniOzoneCluster.newHABuilder(conf) + .setSCMServiceId("scm-service-id") + .setOMServiceId("om-service-id") + .setNumOfActiveOMs(0) + .setNumOfStorageContainerManagers(scmInstancesCount) + .setNumOfActiveSCMs(1); + haMiniClusterBuilder.setNumDatanodes(0); + + // start single SCM instance without other Ozone services + // in order to initialize and bootstrap SCM instances only + cluster = haMiniClusterBuilder.build(); + + List storageContainerManagersList = cluster.getStorageContainerManagersList(); + + // stop the single SCM instance in order to imitate further simultaneous start of SCMs + storageContainerManagersList.get(0).stop(); + storageContainerManagersList.get(0).join(); + + // WHEN (imitate simultaneous start of the SCMs) + int retryCount = 0; + while (true) { + CountDownLatch scmInstancesCounter = new CountDownLatch(scmInstancesCount); + AtomicInteger failedSCMs = new AtomicInteger(); + for (StorageContainerManager scm : storageContainerManagersList) { + new Thread(() -> { + try { + scm.start(); + } catch (IOException e) { + failedSCMs.incrementAndGet(); + } finally { + scmInstancesCounter.countDown(); + } + }).start(); + } + scmInstancesCounter.await(); + if (failedSCMs.get() == 0) { + break; + } else { + for (StorageContainerManager scm : storageContainerManagersList) { + scm.stop(); + scm.join(); + LOG.info("Stopping StorageContainerManager server at {}", + scm.getClientRpcAddress()); + } + ++retryCount; + LOG.info("SCMs port conflicts, retried {} times", + retryCount); + failedSCMs.set(0); + } + } + + // THEN expect only one SCM node (leader) will have 'scmha_metrics_scmha_leader_state' metric set to 1 + int leaderCount = 0; + for (StorageContainerManager scm : storageContainerManagersList) { + if (scm.getScmHAMetrics() != null && scm.getScmHAMetrics().getSCMHAMetricsInfoLeaderState() == 1) { + leaderCount++; + break; + } + } + assertEquals(1, leaderCount); + } + }