diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 09245b2bee7a..89c2d16d7561 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -115,7 +115,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp /** * SCMContext from StorageContainerManager. */ - private final SCMContext scmContext; + private SCMContext scmContext; /** @@ -918,6 +918,10 @@ public ReplicationManagerReport getContainerReport() { return containerReport; } + public boolean isThreadWaiting() { + return replicationMonitor.getState() == Thread.State.TIMED_WAITING; + } + /** * ReplicationMonitor thread runnable. This wakes up at configured * interval and processes all the containers in the system. @@ -1410,6 +1414,11 @@ public boolean shouldRun() { } } + @VisibleForTesting + public void setScmContext(SCMContext context) { + scmContext = context; + } + @Override public String getServiceName() { return ReplicationManager.class.getSimpleName(); @@ -1492,5 +1501,36 @@ public boolean hasHealthyPipeline(ContainerInfo container) { return false; } } + + /** + * Notify the ReplicationManager that a node state has changed, which might + * require container replication. This will wake up the replication monitor + * thread if it's sleeping and there's no active replication work in progress. + * + * @return true if the replication monitor was woken up, false otherwise + */ + public synchronized boolean notifyNodeStateChange() { + if (!running || serviceStatus == ServiceStatus.PAUSING) { + return false; + } + + if (!isThreadWaiting()) { + LOG.debug("Replication monitor is running, not need to wake it up"); + return false; + } + + // Only wake up the thread if there's no active replication work + // This prevents creating a new replication queue over and over + // when multiple nodes change state in quick succession + if (getQueue().isEmpty()) { + LOG.debug("Waking up replication monitor due to node state change"); + // Notify the replication monitor thread to wake up + notify(); + return true; + } else { + LOG.debug("Replication queue is not empty, not waking up replication monitor"); + return false; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java new file mode 100644 index 000000000000..c63c44596d82 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerEventHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles events related to the ReplicationManager. + */ +public class ReplicationManagerEventHandler implements EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationManagerEventHandler.class); + + private final ReplicationManager replicationManager; + private final SCMContext scmContext; + + public ReplicationManagerEventHandler(ReplicationManager replicationManager, SCMContext scmContext) { + this.replicationManager = replicationManager; + this.scmContext = scmContext; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, EventPublisher eventPublisher) { + if (!scmContext.isLeaderReady() || scmContext.isInSafeMode()) { + // same condition in ReplicationManager + return; + } + LOG.debug("ReplicationManagerEventHandler received event for datanode: {}", datanodeDetails); + replicationManager.notifyNodeStateChange(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java index 9e17d6747936..ee869515ced2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java @@ -97,4 +97,8 @@ public int overReplicatedQueueSize() { return overRepQueue.size(); } + public boolean isEmpty() { + return underRepQueue.isEmpty() && overRepQueue.isEmpty(); + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index b72f786acd10..797a6dfd613e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -212,6 +212,10 @@ public final class SCMEvents { new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, "Delete_Block_Status"); + public static final TypedEvent + REPLICATION_MANAGER_NOTIFY = + new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify"); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 69de282e81e1..9f69d9456d5d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -91,12 +92,21 @@ public void onMessage(final DatanodeDetails datanodeDetails, closeContainers(datanodeDetails, publisher); destroyPipelines(datanodeDetails); + boolean isNodeInMaintenance = nodeManager.getNodeStatus(datanodeDetails).isInMaintenance(); + // Remove the container replicas associated with the dead node unless it // is IN_MAINTENANCE - if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + if (!isNodeInMaintenance) { removeContainerReplicas(datanodeDetails); } - + + // Notify ReplicationManager + if (!isNodeInMaintenance) { + LOG.debug("Notifying ReplicationManager about dead node: {}", + datanodeDetails); + publisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanodeDetails); + } + // remove commands in command queue for the DN final List> cmdList = nodeManager.getCommandQueue( datanodeDetails.getUuid()); @@ -105,8 +115,7 @@ public void onMessage(final DatanodeDetails datanodeDetails, // remove DeleteBlocksCommand associated with the dead node unless it // is IN_MAINTENANCE - if (deletedBlockLog != null && - !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + if (deletedBlockLog != null && !isNodeInMaintenance) { deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 5d72d7428f5b..f950e8719e4c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -636,9 +636,34 @@ protected void updateDatanodeOpState(DatanodeDetails reportedDn) } } DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn); + NodeOperationalState oldPersistedOpState = scmDnd.getPersistedOpState(); + NodeOperationalState newPersistedOpState = reportedDn.getPersistedOpState(); + scmDnd.setPersistedOpStateExpiryEpochSec( reportedDn.getPersistedOpStateExpiryEpochSec()); - scmDnd.setPersistedOpState(reportedDn.getPersistedOpState()); + scmDnd.setPersistedOpState(newPersistedOpState); + + maybeNotifyReplicationManager(reportedDn, oldPersistedOpState, newPersistedOpState); + } + + private void maybeNotifyReplicationManager( + DatanodeDetails datanode, + NodeOperationalState oldState, + NodeOperationalState newState) { + if (!scmContext.isLeader()) { + return; + } + + if (oldState != newState) { + // Notify when a node is entering maintenance, decommissioning or back to service + if (newState == NodeOperationalState.ENTERING_MAINTENANCE + || newState == NodeOperationalState.DECOMMISSIONING + || newState == NodeOperationalState.IN_SERVICE) { + LOG.info("Notifying ReplicationManager of node state change for {}: {} -> {}", + datanode, oldState, newState); + scmNodeEventPublisher.fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); + } + } } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 3d1807a8198f..1e1eb4cbe2eb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerEventHandler; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; @@ -495,11 +496,16 @@ private void initializeEventHandlers() { PipelineActionHandler pipelineActionHandler = new PipelineActionHandler(pipelineManager, scmContext); + ReplicationManagerEventHandler replicationManagerEventHandler = + new ReplicationManagerEventHandler(replicationManager, scmContext); + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED, new DatanodeCommandCountUpdatedHandler(replicationManager)); + eventQueue.addHandler(SCMEvents.REPLICATION_MANAGER_NOTIFY, + replicationManagerEventHandler); // Use the same executor for both ICR and FCR. // The Executor maps the event to a thread for DN. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 7555e1ab88ba..935d371bd0d4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -61,6 +61,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -97,6 +99,7 @@ import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Lists; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.TestClock; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.junit.jupiter.api.AfterEach; @@ -104,6 +107,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; /** @@ -1648,6 +1652,62 @@ public void testPendingOpExpiry() throws ContainerNotFoundException { assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testNotifyNodeStateChangeWakesUpThread(boolean queueIsEmpty) + throws IOException, InterruptedException, ReflectiveOperationException, TimeoutException { + + AtomicBoolean processAllCalled = new AtomicBoolean(false); + ReplicationQueue queue = mock(ReplicationQueue.class); + when(queue.isEmpty()).thenReturn(queueIsEmpty); + final ReplicationManager customRM = new ReplicationManager( + configuration, + containerManager, + ratisPlacementPolicy, + ecPlacementPolicy, + eventPublisher, + scmContext, + nodeManager, + clock, + containerReplicaPendingOps) { + @Override + public ReplicationQueue getQueue() { + return queue; + } + + @Override + public synchronized void processAll() { + processAllCalled.set(true); + } + }; + + customRM.notifyStatusChanged(); + customRM.start(); + + // wait for the thread become TIMED_WAITING + GenericTestUtils.waitFor( + () -> customRM.isThreadWaiting(), + 100, + 1000); + + // The processAll method will be called when the ReplicationManager's run + // method is executed by the replicationMonitor thread. + assertTrue(processAllCalled.get()); + processAllCalled.set(false); + + assertThat(customRM.notifyNodeStateChange()).isEqualTo(queueIsEmpty); + + GenericTestUtils.waitFor( + () -> customRM.isThreadWaiting(), + 100, + 1000); + + // If the queue is empty, the processAll method should have been called + assertEquals(processAllCalled.get(), queueIsEmpty); + + customRM.stop(); + } + @SafeVarargs private final Set addReplicas(ContainerInfo container, ContainerReplicaProto.State replicaState, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java new file mode 100644 index 000000000000..480c873cbd0e --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerEventHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.stream.Stream; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Test the ReplicationManagerEventHandler class. + */ +public class TestReplicationManagerEventHandler { + private ReplicationManager replicationManager; + private ReplicationManagerEventHandler replicationManagerEventHandler; + private EventPublisher publisher; + private SCMContext scmContext; + + @BeforeEach + public void setUp() { + replicationManager = mock(ReplicationManager.class); + publisher = mock(EventPublisher.class); + scmContext = mock(SCMContext.class); + replicationManagerEventHandler = new ReplicationManagerEventHandler(replicationManager, scmContext); + } + + private static Stream testData() { + return Stream.of( + Arguments.of(true, false, true), + Arguments.of(false, true, false), + Arguments.of(true, true, false), + Arguments.of(false, false, false) + ); + } + + @ParameterizedTest + @MethodSource("testData") + public void testReplicationManagerEventHandler(boolean isLeaderReady, boolean isInSafeMode, + boolean isExpectedToNotify) { + when(scmContext.isLeaderReady()).thenReturn(isLeaderReady); + when(scmContext.isInSafeMode()).thenReturn(isInSafeMode); + DatanodeDetails dataNodeDetails = MockDatanodeDetails.randomDatanodeDetails(); + replicationManagerEventHandler.onMessage(dataNodeDetails, publisher); + + verify(replicationManager, times(isExpectedToNotify ? 1 : 0)).notifyNodeStateChange(); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index aedf64f926de..674fe2b97208 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -107,6 +108,7 @@ public void setup() throws IOException, AuthenticationException { pipelineManager = (PipelineManagerImpl)scm.getPipelineManager(); pipelineManager.setScmContext(scmContext); + scm.getReplicationManager().setScmContext(scmContext); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); @@ -230,6 +232,10 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); + verify(publisher, times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); + + clearInvocations(publisher); + verify(deletedBlockLog, times(0)) .onDatanodeDead(datanode1.getUuid()); @@ -259,8 +265,8 @@ public void testOnMessage(@TempDir File tempDir) throws Exception { nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType())); - verify(deletedBlockLog, times(1)) - .onDatanodeDead(datanode1.getUuid()); + verify(publisher).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode1); + verify(deletedBlockLog).onDatanodeDead(datanode1.getUuid()); container1Replicas = containerManager .getContainerReplicas(ContainerID.valueOf(container1.getContainerID())); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index cb2315f7fd56..89b1cbd3e191 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -2038,4 +2039,64 @@ public void testScmRegisterNodeWithUpdatedIpAndHostname() assertEquals(emptyList(), nodeManager.getNodesByAddress(ipAddress)); } } + + private static Stream nodeStateTransitions() { + return Stream.of( + // start decommissioning or entering maintenance + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeOperationalState.DECOMMISSIONING, true), + Arguments.of(HddsProtos.NodeOperationalState.IN_SERVICE, + HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, true), + // back to service (DataNodeAdminMonitor abort workflow, maintenance end time expired or node is dead) + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONED, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + Arguments.of(HddsProtos.NodeOperationalState.IN_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_SERVICE, true), + // there is no under/over replicated containers on the node, completed the admin workflow + Arguments.of(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeOperationalState.DECOMMISSIONED, false), + Arguments.of(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, + HddsProtos.NodeOperationalState.IN_MAINTENANCE, false) + ); + } + + @ParameterizedTest + @MethodSource("nodeStateTransitions") + public void testNodeOperationalStateChange( + HddsProtos.NodeOperationalState oldState, + HddsProtos.NodeOperationalState newState, + boolean shouldNotify) + throws IOException, NodeNotFoundException, AuthenticationException { + OzoneConfiguration conf = new OzoneConfiguration(); + SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); + when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); + EventPublisher eventPublisher = mock(EventPublisher.class); + HDDSLayoutVersionManager lvm = new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); + createNodeManager(getConf()); + SCMNodeManager nodeManager = new SCMNodeManager(conf, + scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), + scmContext, lvm); + + DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails(); + datanode.setPersistedOpState(oldState); + nodeManager.register(datanode, null, HddsTestUtils.getRandomPipelineReports()); + + nodeManager.setNodeOperationalState(datanode, newState, 0); + verify(eventPublisher, times(0)).fireEvent(SCMEvents.REPLICATION_MANAGER_NOTIFY, datanode); + + DatanodeDetails reportedDatanode = MockDatanodeDetails.createDatanodeDetails( + datanode.getUuid()); + reportedDatanode.setPersistedOpState(newState); + + nodeManager.processHeartbeat(reportedDatanode); + + verify(eventPublisher, times(shouldNotify ? 1 : 0)).fireEvent( + SCMEvents.REPLICATION_MANAGER_NOTIFY, reportedDatanode); + + nodeManager.close(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java new file mode 100644 index 000000000000..8556a01e5333 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerIntegration.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState; +import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneTestUtils; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneKeyLocation; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for ReplicationManager. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class TestReplicationManagerIntegration { + private static final int DATANODE_COUNT = 5; + private static final int HEALTHY_REPLICA_NUM = 3; + private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig + .getInstance(HddsProtos.ReplicationFactor.THREE); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationManagerIntegration.class); + + private MiniOzoneCluster cluster; + private NodeManager nodeManager; + private ContainerManager containerManager; + private ReplicationManager replicationManager; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerOperationClient scmClient; + private OzoneBucket bucket; + + @BeforeAll + void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 100, MILLISECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 100, MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 2, SECONDS); + conf.setTimeDuration(OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL, + 1, SECONDS); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL, + 1, SECONDS); + conf.setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, + 0, SECONDS); + conf.set(OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); + conf.set(OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); + conf.set(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); + + ReplicationManagerConfiguration replicationConf = conf.getObject(ReplicationManagerConfiguration.class); + replicationConf.setInterval(Duration.ofSeconds(1)); + replicationConf.setUnderReplicatedInterval(Duration.ofMillis(100)); + replicationConf.setOverReplicatedInterval(Duration.ofMillis(100)); + conf.setFromObject(replicationConf); + + MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(DATANODE_COUNT); + + cluster = builder.build(); + cluster.getConf().setTimeDuration(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, SECONDS); + cluster.waitForClusterToBeReady(); + + scm = cluster.getStorageContainerManager(); + nodeManager = scm.getScmNodeManager(); + containerManager = scm.getContainerManager(); + replicationManager = scm.getReplicationManager(); + + client = cluster.newClient(); + scmClient = new ContainerOperationClient(cluster.getConf()); + bucket = TestDataUtil.createVolumeAndBucket(client); + } + + @AfterAll + void shutdown() { + IOUtils.close(LOG, client, scmClient, cluster); + } + + @Order(1) + @Test + void testReplicationManagerNotify() throws Exception { + // Test if RM notify works + replicationManager.getConfig().setInterval(Duration.ofSeconds(300)); + GenericTestUtils.waitFor(() -> replicationManager.isThreadWaiting(), 200, 30000); + } + + @Order(Integer.MAX_VALUE) + @Test + public void testClosedContainerReplicationWhenNodeDies() + throws Exception { + String keyName = "key-" + UUID.randomUUID(); + TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG, + "this is the content".getBytes(StandardCharsets.UTF_8)); + + // Get the container ID for the key + OzoneKeyDetails keyDetails = bucket.getKey(keyName); + List keyLocations = keyDetails.getOzoneKeyLocations(); + long containerID = keyLocations.get(0).getContainerID(); + ContainerID containerId = ContainerID.valueOf(containerID); + // open container would not be handled to do any further processing in RM + OzoneTestUtils.closeContainer(scm, containerManager.getContainer(containerId)); + + assertEquals(HEALTHY_REPLICA_NUM, containerManager.getContainerReplicas(containerId).size()); + + final DatanodeDetails targetDatanode = findReplica(containerId); + + cluster.shutdownHddsDatanode(targetDatanode); + waitForDnToReachHealthState(nodeManager, targetDatanode, DEAD); + + // Check if the replicas nodes don't contain dead one + // and the replica of container replica num is considered to be healthy + GenericTestUtils.waitFor(() -> { + try { + Set replicas = containerManager.getContainerReplicas(containerId); + boolean deadNodeNotInContainerReplica = replicas.stream() + .noneMatch(r -> r.getDatanodeDetails().equals(targetDatanode)); + boolean hasHealthyReplicaNum = replicas.size() == HEALTHY_REPLICA_NUM; + return deadNodeNotInContainerReplica && hasHealthyReplicaNum; + } catch (ContainerNotFoundException e) { + return false; + } + }, 100, 30000); + } + + private DatanodeDetails findReplica(ContainerID containerId) throws ContainerNotFoundException { + // Find a datanode that has a replica of this container + return containerManager.getContainerReplicas(containerId).stream() + .findFirst() + .orElseThrow(() -> new AssertionError("Replica not found for " + containerId)) + .getDatanodeDetails(); + } + + @ParameterizedTest + @EnumSource(value = NodeOperationalState.class, names = {"IN_MAINTENANCE", "DECOMMISSIONED"}) + void testClosedContainerReplicationWhenNodeDecommissionAndBackToInService( + NodeOperationalState expectedOpState) + throws Exception { + + String keyName = "key-" + UUID.randomUUID(); + TestDataUtil.createKey(bucket, keyName, RATIS_REPLICATION_CONFIG, + "this is the content".getBytes(StandardCharsets.UTF_8)); + + OzoneKeyDetails key = bucket.getKey(keyName); + List keyLocations = key.getOzoneKeyLocations(); + + long containerID = keyLocations.get(0).getContainerID(); + ContainerID containerId = ContainerID.valueOf(containerID); + ContainerInfo containerInfo = containerManager.getContainer(containerId); + OzoneTestUtils.closeContainer(scm, containerInfo); + + assertEquals(containerManager.getContainerReplicas(containerId).size(), HEALTHY_REPLICA_NUM); + + DatanodeDetails datanode = findReplica(containerId); + + if (expectedOpState == IN_MAINTENANCE) { + scmClient.startMaintenanceNodes(Collections.singletonList(getDNHostAndPort(datanode)), 0, false); + waitForDnToReachOpState(nodeManager, datanode, expectedOpState); + assertEquals(containerManager.getContainerReplicas(containerId).size(), + HEALTHY_REPLICA_NUM); + } else { + scmClient.decommissionNodes(Collections.singletonList(getDNHostAndPort(datanode)), false); + waitForDnToReachOpState(nodeManager, datanode, expectedOpState); + // decommissioning node would be excluded + assertEquals(containerManager.getContainerReplicas(containerId).size(), + HEALTHY_REPLICA_NUM + 1); + } + + // bring the node back to service + scmClient.recommissionNodes(Collections.singletonList(getDNHostAndPort(datanode))); + + waitForDnToReachOpState(nodeManager, datanode, IN_SERVICE); + waitForDnToReachPersistedOpState(datanode, IN_SERVICE); + + GenericTestUtils.waitFor(() -> { + try { + return containerManager.getContainerReplicas(containerId).size() == HEALTHY_REPLICA_NUM; + } catch (Exception e) { + return false; + } + }, 200, 30000); + } +}