diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java index 34665477516a..bd224e45a79b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java @@ -30,5 +30,5 @@ public interface DatanodeAdminMonitor extends Runnable { void startMonitoring(DatanodeDetails dn); void stopMonitoring(DatanodeDetails dn); Set getTrackedNodes(); - + void setMetrics(NodeDecommissionMetrics metrics); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java index e676fc1cede8..4c446acce7ff 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.node.NodeDecommissionMetrics.ContainerStateInWorkflow; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -38,8 +39,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; @@ -74,6 +77,15 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor { private Queue pendingNodes = new ArrayDeque(); private Queue cancelledNodes = new ArrayDeque(); private Set trackedNodes = new HashSet<>(); + private NodeDecommissionMetrics metrics; + private long pipelinesWaitingToClose = 0; + private long sufficientlyReplicatedContainers = 0; + private long trackedDecomMaintenance = 0; + private long trackedRecommission = 0; + private long unhealthyContainers = 0; + private long underReplicatedContainers = 0; + + private Map containerStateByHost; private static final Logger LOG = LoggerFactory.getLogger(DatanodeAdminMonitorImpl.class); @@ -90,6 +102,8 @@ public DatanodeAdminMonitorImpl( this.eventQueue = eventQueue; this.nodeManager = nodeManager; this.replicationManager = replicationManager; + + containerStateByHost = new HashMap<>(); } /** @@ -117,6 +131,10 @@ public synchronized void stopMonitoring(DatanodeDetails dn) { cancelledNodes.add(dn); } + public synchronized void setMetrics(NodeDecommissionMetrics metrics) { + this.metrics = metrics; + } + /** * Get the set of nodes which are currently tracked in the decommissioned * and maintenance workflow. @@ -139,16 +157,20 @@ public synchronized Set getTrackedNodes() { @Override public void run() { try { + containerStateByHost.clear(); synchronized (this) { + trackedRecommission = getCancelledCount(); processCancelledNodes(); processPendingNodes(); + trackedDecomMaintenance = getTrackedNodeCount(); } processTransitioningNodes(); if (trackedNodes.size() > 0 || pendingNodes.size() > 0) { LOG.info("There are {} nodes tracked for decommission and " + - "maintenance. {} pending nodes.", + "maintenance. {} pending nodes.", trackedNodes.size(), pendingNodes.size()); } + setMetricsToGauge(); } catch (Exception e) { LOG.error("Caught an error in the DatanodeAdminMonitor", e); // Intentionally do not re-throw, as if we do the monitor thread @@ -168,6 +190,28 @@ public int getTrackedNodeCount() { return trackedNodes.size(); } + synchronized void setMetricsToGauge() { + synchronized (metrics) { + metrics.setContainersUnhealthyTotal(unhealthyContainers); + metrics.setRecommissionNodesTotal(trackedRecommission); + metrics.setDecommissioningMaintenanceNodesTotal( + trackedDecomMaintenance); + metrics.setContainersUnderReplicatedTotal( + underReplicatedContainers); + metrics.setContainersSufficientlyReplicatedTotal( + sufficientlyReplicatedContainers); + metrics.setPipelinesWaitingToCloseTotal(pipelinesWaitingToClose); + metrics.metricRecordOfContainerStateByHost(containerStateByHost); + } + } + + void resetContainerMetrics() { + pipelinesWaitingToClose = 0; + sufficientlyReplicatedContainers = 0; + unhealthyContainers = 0; + underReplicatedContainers = 0; + } + private void processCancelledNodes() { while (!cancelledNodes.isEmpty()) { DatanodeDetails dn = cancelledNodes.poll(); @@ -188,7 +232,9 @@ private void processPendingNodes() { } private void processTransitioningNodes() { + resetContainerMetrics(); Iterator iterator = trackedNodes.iterator(); + while (iterator.hasNext()) { DatanodeDetails dn = iterator.next(); try { @@ -256,8 +302,8 @@ private boolean shouldContinueWorkflow(DatanodeDetails dn, NodeStatus nodeStatus) { if (!nodeStatus.isDecommission() && !nodeStatus.isMaintenance()) { LOG.warn("Datanode {} has an operational state of {} when it should " + - "be undergoing decommission or maintenance. Aborting admin for " + - "this node.", dn, nodeStatus.getOperationalState()); + "be undergoing decommission or maintenance. Aborting admin for " + + "this node.", dn, nodeStatus.getOperationalState()); return false; } if (nodeStatus.isDead() && !nodeStatus.isInMaintenance()) { @@ -278,6 +324,10 @@ private boolean checkPipelinesClosedOnNode(DatanodeDetails dn) } else { LOG.info("Waiting for pipelines to close for {}. There are {} " + "pipelines", dn, pipelines.size()); + containerStateByHost.put(dn.getHostName(), + new ContainerStateInWorkflow(dn.getHostName(), 0L, 0L, 0L, + pipelines.size())); + pipelinesWaitingToClose += pipelines.size(); return false; } } @@ -327,6 +377,15 @@ private boolean checkContainersReplicatedOnNode(DatanodeDetails dn) LOG.info("{} has {} sufficientlyReplicated, {} underReplicated and {} " + "unhealthy containers", dn, sufficientlyReplicated, underReplicated, unhealthy); + containerStateByHost.put(dn.getHostName(), + new ContainerStateInWorkflow(dn.getHostName(), + sufficientlyReplicated, + underReplicated, + unhealthy, + 0L)); + sufficientlyReplicatedContainers += sufficientlyReplicated; + underReplicatedContainers += underReplicated; + unhealthyContainers += unhealthy; if (LOG.isDebugEnabled() && underReplicatedIDs.size() < 10000 && unhealthyIDs.size() < 10000) { LOG.debug("{} has {} underReplicated [{}] and {} unhealthy [{}] " + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index 1ea04cdfc3ce..a84b07d51371 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -60,6 +60,9 @@ public class NodeDecommissionManager { private boolean useHostnames; private long monitorInterval; + // Decommissioning and Maintenance mode progress related metrics. + private NodeDecommissionMetrics metrics; + private static final Logger LOG = LoggerFactory.getLogger(NodeDecommissionManager.class); @@ -181,6 +184,7 @@ public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm, this.scmContext = scmContext; this.eventQueue = eventQueue; this.replicationManager = rm; + this.metrics = null; executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DatanodeAdminManager-%d") @@ -208,7 +212,8 @@ public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm, monitor = new DatanodeAdminMonitorImpl(conf, eventQueue, nodeManager, replicationManager); - + this.metrics = NodeDecommissionMetrics.create(); + monitor.setMetrics(this.metrics); executor.scheduleAtFixedRate(monitor, monitorInterval, monitorInterval, TimeUnit.SECONDS); } @@ -373,6 +378,7 @@ public synchronized void startMaintenance(DatanodeDetails dn, int endInHours) * Stops the decommission monitor from running when SCM is shutdown. */ public void stop() { + metrics.unRegister(); if (executor != null) { executor.shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java new file mode 100644 index 000000000000..8be3d2a6d019 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionMetrics.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class contains metrics related to the NodeDecommissionManager. + */ +@Metrics(about = "Node Decommission Metrics", context = OzoneConsts.OZONE) +public final class NodeDecommissionMetrics implements MetricsSource { + public static final String METRICS_SOURCE_NAME = + org.apache.hadoop.hdds.scm.node.NodeDecommissionMetrics + .class.getSimpleName(); + + @Metric("Number of nodes tracked for decommissioning and maintenance.") + private MutableGaugeLong decommissioningMaintenanceNodesTotal; + + @Metric("Number of nodes tracked for recommissioning.") + private MutableGaugeLong recommissionNodesTotal; + + @Metric("Number of nodes tracked with pipelines waiting to close.") + private MutableGaugeLong pipelinesWaitingToCloseTotal; + + @Metric("Number of containers under replicated in tracked nodes.") + private MutableGaugeLong containersUnderReplicatedTotal; + + @Metric("Number of containers unhealthy in tracked nodes.") + private MutableGaugeLong containersUnhealthyTotal; + + @Metric("Number of containers sufficiently replicated in tracked nodes.") + private MutableGaugeLong containersSufficientlyReplicatedTotal; + + /** + * Inner class for snapshot of Datanode ContainerState in + * Decommissioning and Maintenance mode workflow. + */ + public static final class ContainerStateInWorkflow { + private long sufficientlyReplicated = 0; + private long unhealthyContainers = 0; + private long underReplicatedContainers = 0; + private String host = ""; + private long pipelinesWaitingToClose = 0; + + private static final MetricsInfo HOST_UNDER_REPLICATED = Interns.info( + "UnderReplicatedDN", + "Number of under-replicated containers " + + "for host in decommissioning and " + + "maintenance mode"); + + private static final MetricsInfo HOST_PIPELINES_TO_CLOSE = Interns.info( + "PipelinesWaitingToCloseDN", + "Number of pipelines waiting to close for " + + "host in decommissioning and " + + "maintenance mode"); + + private static final MetricsInfo HOST_SUFFICIENTLY_REPLICATED = Interns + .info( + "SufficientlyReplicatedDN", + "Number of sufficiently replicated containers " + + "for host in decommissioning and " + + "maintenance mode"); + + private static final MetricsInfo HOST_UNHEALTHY_CONTAINERS = Interns.info( + "UnhealthyContainersDN", + "Number of unhealthy containers " + + "for host in decommissioning and " + + "maintenance mode"); + + + public ContainerStateInWorkflow(String host, + long sufficiently, + long under, + long unhealthy, + long pipelinesToClose) { + this.host = host; + sufficientlyReplicated = sufficiently; + underReplicatedContainers = under; + unhealthyContainers = unhealthy; + pipelinesWaitingToClose = pipelinesToClose; + } + + public String getHost() { + return host; + } + + public long getSufficientlyReplicated() { + return sufficientlyReplicated; + } + + public long getPipelinesWaitingToClose() { + return pipelinesWaitingToClose; + } + + public long getUnderReplicatedContainers() { + return underReplicatedContainers; + } + + public long getUnhealthyContainers() { + return unhealthyContainers; + } + } + + private MetricsRegistry registry; + + private Map metricsByHost; + + /** Private constructor. */ + private NodeDecommissionMetrics() { + this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); + metricsByHost = new HashMap<>(); + } + + /** + * Create and returns NodeDecommissionMetrics instance. + * + * @return NodeDecommissionMetrics + */ + public static NodeDecommissionMetrics create() { + return DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, + "Metrics tracking the progress of nodes in the " + + "Decommissioning and Maintenance workflows. " + + "Tracks num nodes in mode and container " + + "replications state and pipelines waiting to close", + new NodeDecommissionMetrics()); + } + + /** + * Get aggregated gauge metrics. + */ + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector + .addRecord(METRICS_SOURCE_NAME); + decommissioningMaintenanceNodesTotal.snapshot(builder, all); + recommissionNodesTotal.snapshot(builder, all); + pipelinesWaitingToCloseTotal.snapshot(builder, all); + containersUnderReplicatedTotal.snapshot(builder, all); + containersUnhealthyTotal.snapshot(builder, all); + containersSufficientlyReplicatedTotal.snapshot(builder, all); + + MetricsRecordBuilder recordBuilder = builder; + for (Map.Entry e : + metricsByHost.entrySet()) { + recordBuilder = recordBuilder.endRecord().addRecord(METRICS_SOURCE_NAME) + .add(new MetricsTag(Interns.info("datanode", + "datanode host in decommission maintenance workflow"), + e.getValue().getHost())) + .addGauge(ContainerStateInWorkflow.HOST_PIPELINES_TO_CLOSE, + e.getValue().getPipelinesWaitingToClose()) + .addGauge(ContainerStateInWorkflow.HOST_UNDER_REPLICATED, + e.getValue().getUnderReplicatedContainers()) + .addGauge(ContainerStateInWorkflow.HOST_SUFFICIENTLY_REPLICATED, + e.getValue().getSufficientlyReplicated()) + .addGauge(ContainerStateInWorkflow.HOST_UNHEALTHY_CONTAINERS, + e.getValue().getUnhealthyContainers()); + } + recordBuilder.endRecord(); + } + + /** + * Unregister the metrics instance. + */ + public void unRegister() { + DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); + } + + public synchronized void setDecommissioningMaintenanceNodesTotal( + long numNodesTracked) { + decommissioningMaintenanceNodesTotal + .set(numNodesTracked); + } + + public synchronized void setRecommissionNodesTotal( + long numNodesTrackedRecommissioned) { + recommissionNodesTotal.set(numNodesTrackedRecommissioned); + } + + public synchronized void setPipelinesWaitingToCloseTotal( + long numTrackedPipelinesWaitToClose) { + pipelinesWaitingToCloseTotal + .set(numTrackedPipelinesWaitToClose); + } + + public synchronized void setContainersUnderReplicatedTotal( + long numTrackedUnderReplicated) { + containersUnderReplicatedTotal + .set(numTrackedUnderReplicated); + } + + public synchronized void setContainersUnhealthyTotal( + long numTrackedUnhealthy) { + containersUnhealthyTotal + .set(numTrackedUnhealthy); + } + + public synchronized void setContainersSufficientlyReplicatedTotal( + long numTrackedSufficientlyReplicated) { + containersSufficientlyReplicatedTotal + .set(numTrackedSufficientlyReplicated); + } + + public synchronized long getDecommissioningMaintenanceNodesTotal() { + return decommissioningMaintenanceNodesTotal.value(); + } + + public synchronized long getRecommissionNodesTotal() { + return recommissionNodesTotal.value(); + } + + public synchronized long getPipelinesWaitingToCloseTotal() { + return pipelinesWaitingToCloseTotal.value(); + } + + public synchronized long getContainersUnderReplicatedTotal() { + return containersUnderReplicatedTotal.value(); + } + + public synchronized long getContainersUnhealthyTotal() { + return containersUnhealthyTotal.value(); + } + + public synchronized long getContainersSufficientlyReplicatedTotal() { + return containersSufficientlyReplicatedTotal.value(); + } + + public synchronized void metricRecordOfContainerStateByHost( + Map containerStatesByHost) { + metricsByHost.clear(); + metricsByHost.putAll(containerStatesByHost); + } + + @VisibleForTesting + public Long getPipelinesWaitingToCloseByHost(String host) { + ContainerStateInWorkflow workflowMetrics = metricsByHost.get(host); + return workflowMetrics == null ? null : + workflowMetrics.getPipelinesWaitingToClose(); + } + + @VisibleForTesting + public Long getSufficientlyReplicatedByHost(String host) { + ContainerStateInWorkflow workflowMetrics = metricsByHost.get(host); + return workflowMetrics == null ? null : + workflowMetrics.getSufficientlyReplicated(); + } + + @VisibleForTesting + public Long getUnderReplicatedByHost(String host) { + ContainerStateInWorkflow workflowMetrics = metricsByHost.get(host); + return workflowMetrics == null ? null : + workflowMetrics.getUnderReplicatedContainers(); + } + + @VisibleForTesting + public Long getUnhealthyContainersByHost(String host) { + ContainerStateInWorkflow workflowMetrics = metricsByHost.get(host); + return workflowMetrics == null ? null : + workflowMetrics.getUnhealthyContainers(); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java new file mode 100644 index 000000000000..b5d4d1158d04 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorTestUtil.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount; +import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.mockito.Mockito; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.mockito.Mockito.reset; + +/** + * Helper class to provide common methods used to test DatanodeAdminMonitor + * and NodeDecommissionMetrics for tracking decommission and maintenance mode + * workflow progress. + */ +public final class DatanodeAdminMonitorTestUtil { + private DatanodeAdminMonitorTestUtil() { + } + + /** + * Generate a new ContainerReplica with the given containerID and State. + * @param containerID The ID the replica is associated with + * @param nodeState The persistedOpState stored in datanodeDetails. + * @param replicaState The state of the generated replica. + * @return A containerReplica with the given ID and state + */ + public static ContainerReplica generateReplica( + ContainerID containerID, + HddsProtos.NodeOperationalState nodeState, + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto + .State replicaState) { + DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + dn.setPersistedOpState(nodeState); + return ContainerReplica.newBuilder() + .setContainerState(replicaState) + .setContainerID(containerID) + .setSequenceId(1) + .setDatanodeDetails(dn) + .build(); + } + + /** + * Create a ContainerReplicaCount object, including a container with the + * requested ContainerID and state, along with a set of replicas of the given + * states. + * @param containerID The ID of the container to create an included + * @param containerState The state of the container + * @param states Create a replica for each of the given states. + * @return A ContainerReplicaCount containing the generated container and + * replica set + */ + public static ContainerReplicaCount generateReplicaCount( + ContainerID containerID, + HddsProtos.LifeCycleState containerState, + HddsProtos.NodeOperationalState...states) { + Set replicas = new HashSet<>(); + for (HddsProtos.NodeOperationalState s : states) { + replicas.add(generateReplica(containerID, s, CLOSED)); + } + ContainerInfo container = new ContainerInfo.Builder() + .setContainerID(containerID.getId()) + .setState(containerState) + .build(); + + return new RatisContainerReplicaCount(container, replicas, 0, 0, 3, 2); + } + + /** + * The only interaction the DatanodeAdminMonitor has with the + * ReplicationManager, is to request a ContainerReplicaCount object for each + * container on nodes being deocmmissioned or moved to maintenance. This + * method mocks that interface to return a ContainerReplicaCount with a + * container in the given containerState and a set of replias in the given + * replicaStates. + * @param containerState + * @param replicaStates + * @throws ContainerNotFoundException + */ + public static void mockGetContainerReplicaCount( + ReplicationManager repManager, + HddsProtos.LifeCycleState containerState, + HddsProtos.NodeOperationalState...replicaStates) + throws ContainerNotFoundException { + reset(repManager); + Mockito.when(repManager.getContainerReplicaCount( + Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> + generateReplicaCount((ContainerID)invocation.getArguments()[0], + containerState, replicaStates)); + } + + /** + * This simple internal class is used to track and handle any DatanodeAdmin + * events fired by the DatanodeAdminMonitor during tests. + */ + public static class DatanodeAdminHandler implements + EventHandler { + private AtomicInteger invocation = new AtomicInteger(0); + + @Override + public void onMessage(final DatanodeDetails dn, + final EventPublisher publisher) { + invocation.incrementAndGet(); + } + + public int getInvocation() { + return invocation.get(); + } + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java index 095d137a5a7e..e2b4e13037c9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java @@ -21,20 +21,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.junit.jupiter.api.BeforeEach; @@ -43,7 +35,6 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -51,8 +42,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; -import static org.mockito.Mockito.reset; /** * Tests to ensure the DatanodeAdminMonitor is working correctly. This class @@ -64,7 +53,8 @@ public class TestDatanodeAdminMonitor { private SimpleMockNodeManager nodeManager; private OzoneConfiguration conf; private DatanodeAdminMonitorImpl monitor; - private DatanodeAdminHandler startAdminHandler; + private DatanodeAdminMonitorTestUtil + .DatanodeAdminHandler startAdminHandler; private ReplicationManager repManager; private EventQueue eventQueue; @@ -73,7 +63,8 @@ public void setup() throws IOException, AuthenticationException { conf = new OzoneConfiguration(); eventQueue = new EventQueue(); - startAdminHandler = new DatanodeAdminHandler(); + startAdminHandler = new DatanodeAdminMonitorTestUtil + .DatanodeAdminHandler(); eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE, startAdminHandler); nodeManager = new SimpleMockNodeManager(); @@ -82,6 +73,7 @@ public void setup() throws IOException, AuthenticationException { monitor = new DatanodeAdminMonitorImpl(conf, eventQueue, nodeManager, repManager); + monitor.setMetrics(NodeDecommissionMetrics.create()); } @Test @@ -173,11 +165,13 @@ public void testDecommissionNodeWaitsForContainersToReplicate() nodeManager.setContainers(dn1, generateContainers(3)); // Mock Replication Manager to return ContainerReplicaCount's which // always have a DECOMMISSIONED replica. - mockGetContainerReplicaCount( - HddsProtos.LifeCycleState.CLOSED, - DECOMMISSIONED, - IN_SERVICE, - IN_SERVICE); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount( + repManager, + HddsProtos.LifeCycleState.CLOSED, + DECOMMISSIONED, + IN_SERVICE, + IN_SERVICE); // Run the monitor for the first time and the node will transition to // REPLICATE_CONTAINERS as there are no pipelines to close. @@ -197,11 +191,13 @@ public void testDecommissionNodeWaitsForContainersToReplicate() // Now change the replicationManager mock to return 3 CLOSED replicas // and the node should complete the REPLICATE_CONTAINERS step, moving to // complete which will end the decommission workflow - mockGetContainerReplicaCount( - HddsProtos.LifeCycleState.CLOSED, - IN_SERVICE, - IN_SERVICE, - IN_SERVICE); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount( + repManager, + HddsProtos.LifeCycleState.CLOSED, + IN_SERVICE, + IN_SERVICE, + IN_SERVICE); monitor.run(); @@ -219,11 +215,13 @@ public void testDecommissionAbortedWhenNodeInUnexpectedState() HddsProtos.NodeState.HEALTHY)); nodeManager.setContainers(dn1, generateContainers(3)); - mockGetContainerReplicaCount( - HddsProtos.LifeCycleState.CLOSED, - DECOMMISSIONED, - IN_SERVICE, - IN_SERVICE); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount( + repManager, + HddsProtos.LifeCycleState.CLOSED, + DECOMMISSIONED, + IN_SERVICE, + IN_SERVICE); // Add the node to the monitor, it should have 3 under-replicated containers // after the first run @@ -254,9 +252,13 @@ public void testDecommissionAbortedWhenNodeGoesDead() HddsProtos.NodeState.HEALTHY)); nodeManager.setContainers(dn1, generateContainers(3)); - mockGetContainerReplicaCount( - HddsProtos.LifeCycleState.CLOSED, - DECOMMISSIONED, IN_SERVICE, IN_SERVICE); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount( + repManager, + HddsProtos.LifeCycleState.CLOSED, + DECOMMISSIONED, + IN_SERVICE, + IN_SERVICE); // Add the node to the monitor, it should have 3 under-replicated containers // after the first run @@ -344,11 +346,13 @@ public void testMaintenanceEndsWhileReplicatingContainers() HddsProtos.NodeState.HEALTHY)); nodeManager.setContainers(dn1, generateContainers(3)); - mockGetContainerReplicaCount( - HddsProtos.LifeCycleState.CLOSED, - IN_MAINTENANCE, - ENTERING_MAINTENANCE, - IN_MAINTENANCE); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount( + repManager, + HddsProtos.LifeCycleState.CLOSED, + IN_MAINTENANCE, + ENTERING_MAINTENANCE, + IN_MAINTENANCE); // Add the node to the monitor, it should transiting to // REPLICATE_CONTAINERS as the containers are under-replicated for @@ -432,51 +436,6 @@ private Set generateContainers(int count) { return containers; } - /** - * Create a ContainerReplicaCount object, including a container with the - * requested ContainerID and state, along with a set of replicas of the given - * states. - * @param containerID The ID of the container to create an included - * @param containerState The state of the container - * @param states Create a replica for each of the given states. - * @return A ContainerReplicaCount containing the generated container and - * replica set - */ - private ContainerReplicaCount generateReplicaCount(ContainerID containerID, - HddsProtos.LifeCycleState containerState, - HddsProtos.NodeOperationalState...states) { - Set replicas = new HashSet<>(); - for (HddsProtos.NodeOperationalState s : states) { - replicas.add(generateReplica(containerID, s, CLOSED)); - } - ContainerInfo container = new ContainerInfo.Builder() - .setContainerID(containerID.getId()) - .setState(containerState) - .build(); - - return new RatisContainerReplicaCount(container, replicas, 0, 0, 3, 2); - } - - /** - * Generate a new ContainerReplica with the given containerID and State. - * @param containerID The ID the replica is associated with - * @param nodeState The persistedOpState stored in datanodeDetails. - * @param replicaState The state of the generated replica. - * @return A containerReplica with the given ID and state - */ - private ContainerReplica generateReplica(ContainerID containerID, - HddsProtos.NodeOperationalState nodeState, - ContainerReplicaProto.State replicaState) { - DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); - dn.setPersistedOpState(nodeState); - return ContainerReplica.newBuilder() - .setContainerState(replicaState) - .setContainerID(containerID) - .setSequenceId(1) - .setDatanodeDetails(dn) - .build(); - } - /** * Helper method to get the first node from the set of trackedNodes within * the monitor. @@ -486,47 +445,4 @@ private DatanodeDetails getFirstTrackedNode() { return monitor.getTrackedNodes().toArray(new DatanodeDetails[0])[0]; } - - /** - * The only interaction the DatanodeAdminMonitor has with the - * ReplicationManager, is to request a ContainerReplicaCount object for each - * container on nodes being deocmmissioned or moved to maintenance. This - * method mocks that interface to return a ContainerReplicaCount with a - * container in the given containerState and a set of replias in the given - * replicaStates. - * @param containerState - * @param replicaStates - * @throws ContainerNotFoundException - */ - private void mockGetContainerReplicaCount( - HddsProtos.LifeCycleState containerState, - HddsProtos.NodeOperationalState...replicaStates) - throws ContainerNotFoundException { - reset(repManager); - Mockito.when(repManager.getContainerReplicaCount( - Mockito.any(ContainerID.class))) - .thenAnswer(invocation -> - generateReplicaCount((ContainerID)invocation.getArguments()[0], - containerState, replicaStates)); - } - - /** - * This simple internal class is used to track and handle any DatanodeAdmin - * events fired by the DatanodeAdminMonitor during tests. - */ - private static class DatanodeAdminHandler implements - EventHandler { - - private AtomicInteger invocation = new AtomicInteger(0); - - @Override - public void onMessage(final DatanodeDetails dn, - final EventPublisher publisher) { - invocation.incrementAndGet(); - } - - public int getInvocation() { - return invocation.get(); - } - } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java new file mode 100644 index 000000000000..0383ef122d0a --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionMetrics.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; + +/** + * Tests for the NodeDecommissionMetrics class. + */ +public class TestNodeDecommissionMetrics { + private NodeDecommissionMetrics metrics; + private SimpleMockNodeManager nodeManager; + private OzoneConfiguration conf; + private DatanodeAdminMonitorImpl monitor; + private DatanodeAdminMonitorTestUtil + .DatanodeAdminHandler startAdminHandler; + private ReplicationManager repManager; + private EventQueue eventQueue; + + + @BeforeEach + public void setup() { + conf = new OzoneConfiguration(); + eventQueue = new EventQueue(); + startAdminHandler = new DatanodeAdminMonitorTestUtil + .DatanodeAdminHandler(); + eventQueue.addHandler(SCMEvents.START_ADMIN_ON_NODE, startAdminHandler); + nodeManager = new SimpleMockNodeManager(); + repManager = Mockito.mock(ReplicationManager.class); + monitor = + new DatanodeAdminMonitorImpl( + conf, eventQueue, nodeManager, repManager); + metrics = NodeDecommissionMetrics.create(); + monitor.setMetrics(metrics); + } + + @AfterEach + public void after() { + metrics.unRegister(); + } + + /** + * Test for collecting metric for nodes tracked in decommissioning + * and maintenance workflow. Dn in entering maintenance mode. + */ + @Test + public void testDecommMonitorCollectTrackedNodes() { + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + nodeManager.register(dn1, + new NodeStatus(ENTERING_MAINTENANCE, + HddsProtos.NodeState.HEALTHY)); + monitor.startMonitoring(dn1); + monitor.run(); + Assertions.assertEquals(1, + metrics.getDecommissioningMaintenanceNodesTotal()); + } + + /** + * Test for collecting metric for nodes tracked in workflow that are + * in recommission workflow. Dn decommissioned, and recommissioned. + */ + @Test + public void testDecommMonitorCollectRecommissionNodes() { + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + nodeManager.register(dn1, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + monitor.startMonitoring(dn1); + monitor.run(); + // Recommission by putting node back in service. + // Stop monitor and run. + monitor.stopMonitoring(dn1); + monitor.run(); + Assertions.assertEquals(0, + metrics.getDecommissioningMaintenanceNodesTotal()); + Assertions.assertEquals(1, + metrics.getRecommissionNodesTotal()); + } + + /** + * Test for collecting metric for pipelines waiting to be closed when + * datanode enters decommissioning workflow. + */ + @Test + public void testDecommMonitorCollectPipelinesWaitingClosed() { + DatanodeDetails dn1 = MockDatanodeDetails.createDatanodeDetails( + "datanode_host1", + "/r1/ng1"); + nodeManager.register(dn1, + new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + // Ensure the node has some pipelines + nodeManager.setPipelines(dn1, 2); + // Add the node to the monitor + monitor.startMonitoring(dn1); + monitor.run(); + // Ensure a StartAdmin event was fired + eventQueue.processAll(20000); + Assertions.assertEquals(2, + metrics.getPipelinesWaitingToCloseTotal()); + + // should have host specific metric collected + // for datanode_host1 + Assertions.assertEquals(2, + metrics.getPipelinesWaitingToCloseByHost( + "datanode_host1")); + // Clear the pipelines and the metric collected for + // datanode_host1 should clear + nodeManager.setPipelines(dn1, 0); + monitor.run(); + eventQueue.processAll(20000); + Assertions.assertEquals(0, + metrics.getPipelinesWaitingToCloseByHost( + "datanode_host1")); + } + + /** + * Test for collecting metric for under replicated containers + * from nodes in decommissioning and maintenance workflow. + */ + @Test + public void testDecommMonitorCollectUnderReplicated() + throws ContainerNotFoundException, NodeNotFoundException { + DatanodeDetails dn1 = MockDatanodeDetails.createDatanodeDetails( + "datanode_host1", + "/r1/ng1"); + nodeManager.register(dn1, + new NodeStatus(HddsProtos.NodeOperationalState.DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + + Set containers = new HashSet<>(); + containers.add(ContainerID.valueOf(1)); + + // create container with 3 replicas, 2 replicas in-service + // 1 decommissioned; will result in an under-replicated + // container + nodeManager.setContainers(dn1, containers); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount(repManager, + HddsProtos.LifeCycleState.CLOSED, + DECOMMISSIONED, + IN_SERVICE, + IN_SERVICE); + + // Add the node to the monitor, it should have 1 under-replicated + // container after the first run + monitor.startMonitoring(dn1); + monitor.run(); + Assertions.assertEquals(1, + metrics.getContainersUnderReplicatedTotal()); + + // should have host specific metric collected + // for datanode_host1 + Assertions.assertEquals(1, + metrics.getUnderReplicatedByHost("datanode_host1")); + } + + /** + * Test for collecting metric for sufficiently replicated containers + * from nodes in decommissioning and maintenance workflow. + */ + @Test + public void testDecommMonitorCollectSufficientlyReplicated() + throws ContainerNotFoundException, NodeNotFoundException { + DatanodeDetails dn1 = MockDatanodeDetails.createDatanodeDetails( + "datanode_host1", + "/r1/ng1"); + nodeManager.register(dn1, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + Set containers = new HashSet<>(); + containers.add(ContainerID.valueOf(1)); + + // create container with 3 replicas, + // all in-service + nodeManager.setContainers(dn1, containers); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount(repManager, + HddsProtos.LifeCycleState.CLOSED, + IN_SERVICE, + IN_SERVICE, + IN_SERVICE); + monitor.startMonitoring(dn1); + + monitor.run(); + // expect dn in decommissioning workflow with container + // sufficiently replicated + Assertions.assertEquals(1, + metrics.getContainersSufficientlyReplicatedTotal()); + + // should have host specific metric collected + // for datanode_host1 + Assertions.assertEquals(1, + metrics.getSufficientlyReplicatedByHost("datanode_host1")); + } + + /** + * Test for collecting metric for unhealthy containers + * from nodes in decommissioning and maintenance workflow. + */ + @Test + public void testDecommMonitorCollectUnhealthyContainers() + throws ContainerNotFoundException, NodeNotFoundException { + DatanodeDetails dn1 = MockDatanodeDetails.createDatanodeDetails( + "datanode_host1", + "/r1/ng1"); + nodeManager.register(dn1, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + Set containers = new HashSet<>(); + containers.add(ContainerID.valueOf(1)); + + // set OPEN container with 1 replica CLOSED replica state, + // in-service node, generates monitored unhealthy container replica + nodeManager.setContainers(dn1, containers); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount(repManager, + HddsProtos.LifeCycleState.OPEN, + IN_SERVICE); + monitor.startMonitoring(dn1); + + monitor.run(); + Assertions.assertEquals(1, + metrics.getContainersUnhealthyTotal()); + + // should have host specific metric collected + // for datanode_host1 + Assertions.assertEquals(1, + metrics.getUnhealthyContainersByHost( + "datanode_host1")); + } + + /** + * Test for collecting aggregated metric for replicated state - + * total number of under-replicated containers over multiple + * datanodes in the decommissioning and maintenance workflow. + */ + @Test + public void testDecommMonitorCollectionMultipleDnContainers() + throws ContainerNotFoundException, NodeNotFoundException { + // test metric aggregation over several datanodes + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails(); + + nodeManager.register(dn1, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + nodeManager.register(dn2, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + + Set containersDn1 = new HashSet<>(); + containersDn1.add(ContainerID.valueOf(1)); + containersDn1.add(ContainerID.valueOf(2)); + + nodeManager.setContainers(dn1, containersDn1); + + Set containersDn2 = new HashSet<>(); + containersDn2.add(ContainerID.valueOf(3)); + + nodeManager.setContainers(dn2, containersDn2); + DatanodeAdminMonitorTestUtil + .mockGetContainerReplicaCount(repManager, + HddsProtos.LifeCycleState.CLOSED, + DECOMMISSIONED, + IN_SERVICE, + IN_SERVICE); + + monitor.startMonitoring(dn1); + monitor.startMonitoring(dn2); + + monitor.run(); + Assertions.assertEquals(3, + metrics.getContainersUnderReplicatedTotal()); + } + + /** + * Test for collecting aggregated metric for total number + * of pipelines waiting to close - over multiple + * datanodes in the decommissioning and maintenance workflow. + */ + @Test + public void testDecommMonitorCollectionMultipleDnPipelines() { + // test metric aggregation over several datanodes + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails(); + + nodeManager.register(dn1, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + nodeManager.register(dn2, + new NodeStatus(DECOMMISSIONING, + HddsProtos.NodeState.HEALTHY)); + + nodeManager.setPipelines(dn1, 2); + nodeManager.setPipelines(dn2, 1); + + monitor.startMonitoring(dn1); + monitor.startMonitoring(dn2); + + monitor.run(); + Assertions.assertEquals(3, + metrics.getPipelinesWaitingToCloseTotal()); + } +}