diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml index ce01db47b597..9bfd9433bbb2 100644 --- a/hadoop-hdds/container-service/pom.xml +++ b/hadoop-hdds/container-service/pom.xml @@ -127,6 +127,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> io.netty netty-handler + + org.apache.commons + commons-text + org.mockito mockito-junit-jupiter diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java new file mode 100644 index 000000000000..29eaa835ea7a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine; + +import com.google.common.base.CaseFormat; +import org.apache.commons.text.WordUtils; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Class contains metrics related to Datanode queues. + */ +@Metrics(about = "Datanode Queue Metrics", context = OzoneConsts.OZONE) +public final class DatanodeQueueMetrics implements MetricsSource { + + private static final Logger LOG = + LoggerFactory.getLogger(DatanodeQueueMetrics.class); + + public static final String METRICS_SOURCE_NAME = + DatanodeQueueMetrics.class.getSimpleName(); + + public static final String STATE_CONTEXT_COMMAND_QUEUE_PREFIX = + "StateContextCommandQueue"; + public static final String COMMAND_DISPATCHER_QUEUE_PREFIX = + "CommandDispatcherCommandQueue"; + public static final String INCREMENTAL_REPORT_QUEUE_PREFIX = + "IncrementalReportQueue"; + public static final String CONTAINER_ACTION_QUEUE_PREFIX = + "ContainerActionQueue"; + public static final String PIPELINE_ACTION_QUEUE_PREFIX = + "PipelineActionQueue"; + + private MetricsRegistry registry; + + private DatanodeStateMachine datanodeStateMachine; + private static DatanodeQueueMetrics instance; + + private Map stateContextCommandQueueMap; + private Map commandDispatcherQueueMap; + private Map incrementalReportsQueueMap; + private Map containerActionQueueMap; + private Map pipelineActionQueueMap; + + public DatanodeQueueMetrics(DatanodeStateMachine datanodeStateMachine) { + this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); + this.datanodeStateMachine = datanodeStateMachine; + + initializeQueues(); + } + + public static synchronized DatanodeQueueMetrics create(DatanodeStateMachine + datanodeStateMachine) { + if (instance != null) { + return instance; + } + instance = DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, + "Queue metrics in Datanode", + new DatanodeQueueMetrics(datanodeStateMachine)); + return instance; + } + + private void initializeQueues() { + // Add queue from StateContext.commandQueue + stateContextCommandQueueMap = new HashMap<>(); + for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) { + stateContextCommandQueueMap.put(type, getMetricsInfo( + STATE_CONTEXT_COMMAND_QUEUE_PREFIX, String.valueOf(type))); + } + + // Add queue from DatanodeStateMachine.commandDispatcher + commandDispatcherQueueMap = new HashMap<>(); + for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) { + commandDispatcherQueueMap.put(type, getMetricsInfo( + COMMAND_DISPATCHER_QUEUE_PREFIX, String.valueOf(type))); + } + + // Initialize queue for StateContext.incrementalReportQueue, + // containerActionQueue, pipelineActionQueue + incrementalReportsQueueMap = new HashMap<>(); + containerActionQueueMap = new HashMap<>(); + pipelineActionQueueMap = new HashMap<>(); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder builder = collector.addRecord(METRICS_SOURCE_NAME); + + Map tmpMap = + datanodeStateMachine.getContext().getCommandQueueSummary(); + for (Map.Entry entry: + stateContextCommandQueueMap.entrySet()) { + builder.addGauge(entry.getValue(), + (long) tmpMap.getOrDefault(entry.getKey(), 0)); + } + + tmpMap = datanodeStateMachine.getCommandDispatcher() + .getQueuedCommandCount(); + for (Map.Entry entry: + commandDispatcherQueueMap.entrySet()) { + builder.addGauge(entry.getValue(), + (long) tmpMap.getOrDefault(entry.getKey(), 0)); + } + + for (Map.Entry entry: + incrementalReportsQueueMap.entrySet()) { + builder.addGauge(entry.getValue(), + datanodeStateMachine.getContext() + .getIncrementalReportQueueSize().getOrDefault(entry.getKey(), 0)); + } + for (Map.Entry entry: + containerActionQueueMap.entrySet()) { + builder.addGauge(entry.getValue(), + datanodeStateMachine.getContext() + .getContainerActionQueueSize().getOrDefault(entry.getKey(), 0)); + } + for (Map.Entry entry: + pipelineActionQueueMap.entrySet()) { + builder.addGauge(entry.getValue(), + datanodeStateMachine.getContext().getPipelineActionQueueSize() + .getOrDefault(entry.getKey(), 0)); + } + } + + public static synchronized void unRegister() { + instance = null; + DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); + } + + public void addEndpoint(InetSocketAddress endpoint) { + incrementalReportsQueueMap.computeIfAbsent(endpoint, + k -> getMetricsInfo(INCREMENTAL_REPORT_QUEUE_PREFIX, + CaseFormat.UPPER_UNDERSCORE + .to(CaseFormat.UPPER_CAMEL, k.getHostName()))); + containerActionQueueMap.computeIfAbsent(endpoint, + k -> getMetricsInfo(CONTAINER_ACTION_QUEUE_PREFIX, + CaseFormat.UPPER_UNDERSCORE + .to(CaseFormat.UPPER_CAMEL, k.getHostName()))); + pipelineActionQueueMap.computeIfAbsent(endpoint, + k -> getMetricsInfo(PIPELINE_ACTION_QUEUE_PREFIX, + CaseFormat.UPPER_UNDERSCORE + .to(CaseFormat.UPPER_CAMEL, k.getHostName()))); + } + + private MetricsInfo getMetricsInfo(String prefix, String metricName) { + String metric = prefix + WordUtils.capitalize(metricName) + "Size"; + String description = "Queue size of " + metricName + " from " + prefix; + return info(metric, description); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index cc05511b58aa..e996edaa3782 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -117,6 +117,7 @@ public class DatanodeStateMachine implements Closeable { private final MeasuredReplicator replicatorMetrics; private final ReplicationSupervisorMetrics replicationSupervisorMetrics; + private final DatanodeQueueMetrics queueMetrics; /** * Constructs a datanode state machine. * @param datanodeDetails - DatanodeDetails used to identify a datanode @@ -220,6 +221,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addPublisherFor(PipelineReportsProto.class) .addPublisherFor(CRLStatusReport.class) .build(); + + queueMetrics = DatanodeQueueMetrics.create(this); } private int getEndPointTaskThreadPoolSize() { @@ -412,6 +415,10 @@ public void close() throws IOException { if (commandDispatcher != null) { commandDispatcher.stop(); } + + if (queueMetrics != null) { + DatanodeQueueMetrics.unRegister(); + } } /** @@ -704,4 +711,8 @@ public UpgradeFinalizer getUpgradeFinalizer() { public ConfigurationSource getConf() { return conf; } + + public DatanodeQueueMetrics getQueueMetrics() { + return queueMetrics; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 7a7f7926138e..eb44b3b8b630 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -40,6 +40,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.Descriptor; @@ -886,6 +887,9 @@ public void addEndpoint(InetSocketAddress endpoint) { mp.putIfAbsent(e, new AtomicBoolean(true)); }); this.isFullReportReadyToBeSent.putIfAbsent(endpoint, mp); + if (getQueueMetrics() != null) { + getQueueMetrics().addEndpoint(endpoint); + } } } @@ -919,4 +923,23 @@ public void configureReconHeartbeatFrequency() { public long getReconHeartbeatFrequency() { return reconHeartbeatFrequency.get(); } + + public Map getPipelineActionQueueSize() { + return pipelineActions.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + } + + public Map getContainerActionQueueSize() { + return containerActions.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + } + + public Map getIncrementalReportQueueSize() { + return incrementalReportsQueue.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + } + + public DatanodeQueueMetrics getQueueMetrics() { + return parentDatanodeStateMachine.getQueueMetrics(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestDatanodeQueueMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestDatanodeQueueMetrics.java new file mode 100644 index 000000000000..b41bd15a970a --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestDatanodeQueueMetrics.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.metrics; + +import org.apache.commons.text.WordUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics; +import org.junit.Rule; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics.COMMAND_DISPATCHER_QUEUE_PREFIX; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics.STATE_CONTEXT_COMMAND_QUEUE_PREFIX; +import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +/** + * Test for queue metrics of datanodes. + */ +public class TestDatanodeQueueMetrics { + + private MiniOzoneHAClusterImpl cluster = null; + private OzoneConfiguration conf; + private String clusterId; + private String scmId; + private String omServiceId; + private static int numOfOMs = 3; + private String scmServiceId; + private static int numOfSCMs = 3; + + private static final Logger LOG = LoggerFactory + .getLogger(TestDatanodeQueueMetrics.class); + + @Rule + public Timeout timeout = new Timeout(300_000); + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeEach + public void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s"); + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + omServiceId = "om-service-test1"; + scmServiceId = "scm-service-test1"; + cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) + .setClusterId(clusterId) + .setScmId(scmId) + .setOMServiceId(omServiceId) + .setSCMServiceId(scmServiceId) + .setNumOfStorageContainerManagers(numOfSCMs) + .setNumOfOzoneManagers(numOfOMs) + .setNumDatanodes(1) + .build(); + cluster.waitForClusterToBeReady(); + } + /** + * Set a timeout for each test. + */ + + @Test + public void testQueueMetrics() { + + for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) { + Assertions.assertTrue( + getGauge(STATE_CONTEXT_COMMAND_QUEUE_PREFIX + + WordUtils.capitalize(String.valueOf(type)) + "Size") >= 0); + Assertions.assertTrue( + getGauge(COMMAND_DISPATCHER_QUEUE_PREFIX + + WordUtils.capitalize(String.valueOf(type)) + "Size") >= 0); + } + + } + + private long getGauge(String metricName) { + return getLongGauge(metricName, + getMetrics(DatanodeQueueMetrics.METRICS_SOURCE_NAME)); + } +} \ No newline at end of file