Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-hdds/container-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.container.common.statemachine;

import com.google.common.base.CaseFormat;
import org.apache.commons.text.WordUtils;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.metrics2.lib.Interns.info;

/**
* Class contains metrics related to Datanode queues.
*/
@Metrics(about = "Datanode Queue Metrics", context = OzoneConsts.OZONE)
public final class DatanodeQueueMetrics implements MetricsSource {

private static final Logger LOG =
LoggerFactory.getLogger(DatanodeQueueMetrics.class);

public static final String METRICS_SOURCE_NAME =
DatanodeQueueMetrics.class.getSimpleName();

public static final String STATE_CONTEXT_COMMAND_QUEUE_PREFIX =
"StateContextCommandQueue";
public static final String COMMAND_DISPATCHER_QUEUE_PREFIX =
"CommandDispatcherCommandQueue";
public static final String INCREMENTAL_REPORT_QUEUE_PREFIX =
"IncrementalReportQueue";
public static final String CONTAINER_ACTION_QUEUE_PREFIX =
"ContainerActionQueue";
public static final String PIPELINE_ACTION_QUEUE_PREFIX =
"PipelineActionQueue";

private MetricsRegistry registry;

private DatanodeStateMachine datanodeStateMachine;
private static DatanodeQueueMetrics instance;

private Map<SCMCommandProto.Type, MetricsInfo> stateContextCommandQueueMap;
private Map<SCMCommandProto.Type, MetricsInfo> commandDispatcherQueueMap;
private Map<InetSocketAddress, MetricsInfo> incrementalReportsQueueMap;
private Map<InetSocketAddress, MetricsInfo> containerActionQueueMap;
private Map<InetSocketAddress, MetricsInfo> pipelineActionQueueMap;

public DatanodeQueueMetrics(DatanodeStateMachine datanodeStateMachine) {
this.registry = new MetricsRegistry(METRICS_SOURCE_NAME);
this.datanodeStateMachine = datanodeStateMachine;

initializeQueues();
}

public static synchronized DatanodeQueueMetrics create(DatanodeStateMachine
datanodeStateMachine) {
if (instance != null) {
return instance;
}
instance = DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME,
"Queue metrics in Datanode",
new DatanodeQueueMetrics(datanodeStateMachine));
return instance;
}

private void initializeQueues() {
// Add queue from StateContext.commandQueue
stateContextCommandQueueMap = new HashMap<>();
for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) {
stateContextCommandQueueMap.put(type, getMetricsInfo(
STATE_CONTEXT_COMMAND_QUEUE_PREFIX, String.valueOf(type)));
}

// Add queue from DatanodeStateMachine.commandDispatcher
commandDispatcherQueueMap = new HashMap<>();
for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) {
commandDispatcherQueueMap.put(type, getMetricsInfo(
COMMAND_DISPATCHER_QUEUE_PREFIX, String.valueOf(type)));
}

// Initialize queue for StateContext.incrementalReportQueue,
// containerActionQueue, pipelineActionQueue
incrementalReportsQueueMap = new HashMap<>();
containerActionQueueMap = new HashMap<>();
pipelineActionQueueMap = new HashMap<>();
}

@Override
public void getMetrics(MetricsCollector collector, boolean b) {
MetricsRecordBuilder builder = collector.addRecord(METRICS_SOURCE_NAME);

Map<SCMCommandProto.Type, Integer> tmpMap =
datanodeStateMachine.getContext().getCommandQueueSummary();
for (Map.Entry<SCMCommandProto.Type, MetricsInfo> entry:
stateContextCommandQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
(long) tmpMap.getOrDefault(entry.getKey(), 0));
}

tmpMap = datanodeStateMachine.getCommandDispatcher()
.getQueuedCommandCount();
for (Map.Entry<SCMCommandProto.Type, MetricsInfo> entry:
commandDispatcherQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
(long) tmpMap.getOrDefault(entry.getKey(), 0));
}

for (Map.Entry<InetSocketAddress, MetricsInfo> entry:
incrementalReportsQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
datanodeStateMachine.getContext()
.getIncrementalReportQueueSize().getOrDefault(entry.getKey(), 0));
}
for (Map.Entry<InetSocketAddress, MetricsInfo> entry:
containerActionQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
datanodeStateMachine.getContext()
.getContainerActionQueueSize().getOrDefault(entry.getKey(), 0));
}
for (Map.Entry<InetSocketAddress, MetricsInfo> entry:
pipelineActionQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
datanodeStateMachine.getContext().getPipelineActionQueueSize()
.getOrDefault(entry.getKey(), 0));
}
}

public static synchronized void unRegister() {
instance = null;
DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME);
}

public void addEndpoint(InetSocketAddress endpoint) {
incrementalReportsQueueMap.computeIfAbsent(endpoint,
k -> getMetricsInfo(INCREMENTAL_REPORT_QUEUE_PREFIX,
CaseFormat.UPPER_UNDERSCORE
.to(CaseFormat.UPPER_CAMEL, k.getHostName())));
containerActionQueueMap.computeIfAbsent(endpoint,
k -> getMetricsInfo(CONTAINER_ACTION_QUEUE_PREFIX,
CaseFormat.UPPER_UNDERSCORE
.to(CaseFormat.UPPER_CAMEL, k.getHostName())));
pipelineActionQueueMap.computeIfAbsent(endpoint,
k -> getMetricsInfo(PIPELINE_ACTION_QUEUE_PREFIX,
CaseFormat.UPPER_UNDERSCORE
.to(CaseFormat.UPPER_CAMEL, k.getHostName())));
}

private MetricsInfo getMetricsInfo(String prefix, String metricName) {
String metric = prefix + WordUtils.capitalize(metricName);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should append count / size / total to the end of the metric name? Most counters seem to end in something like this.

String description = "Queue size of " + metricName + " from " + prefix;
return info(metric, description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class DatanodeStateMachine implements Closeable {
private final MeasuredReplicator replicatorMetrics;
private final ReplicationSupervisorMetrics replicationSupervisorMetrics;

private final DatanodeQueueMetrics queueMetrics;
/**
* Constructs a datanode state machine.
* @param datanodeDetails - DatanodeDetails used to identify a datanode
Expand Down Expand Up @@ -220,6 +221,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.addPublisherFor(PipelineReportsProto.class)
.addPublisherFor(CRLStatusReport.class)
.build();

queueMetrics = DatanodeQueueMetrics.create(this);
}

private int getEndPointTaskThreadPoolSize() {
Expand Down Expand Up @@ -412,6 +415,10 @@ public void close() throws IOException {
if (commandDispatcher != null) {
commandDispatcher.stop();
}

if (queueMetrics != null) {
DatanodeQueueMetrics.unRegister();
}
}

/**
Expand Down Expand Up @@ -704,4 +711,8 @@ public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
public ConfigurationSource getConf() {
return conf;
}

public DatanodeQueueMetrics getQueueMetrics() {
return queueMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -886,6 +887,9 @@ public void addEndpoint(InetSocketAddress endpoint) {
mp.putIfAbsent(e, new AtomicBoolean(true));
});
this.isFullReportReadyToBeSent.putIfAbsent(endpoint, mp);
if (getQueueMetrics() != null) {
getQueueMetrics().addEndpoint(endpoint);
}
}
}

Expand Down Expand Up @@ -919,4 +923,23 @@ public void configureReconHeartbeatFrequency() {
public long getReconHeartbeatFrequency() {
return reconHeartbeatFrequency.get();
}

public Map<InetSocketAddress, Integer> getPipelineActionQueueSize() {
return pipelineActions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}

public Map<InetSocketAddress, Integer> getContainerActionQueueSize() {
return containerActions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}

public Map<InetSocketAddress, Integer> getIncrementalReportQueueSize() {
return incrementalReportsQueue.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
}

public DatanodeQueueMetrics getQueueMetrics() {
return parentDatanodeStateMachine.getQueueMetrics();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.container.metrics;

import org.apache.commons.text.WordUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics;
import org.junit.Rule;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.UUID;

import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics.COMMAND_DISPATCHER_QUEUE_PREFIX;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeQueueMetrics.STATE_CONTEXT_COMMAND_QUEUE_PREFIX;
import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

/**
* Test for queue metrics of datanodes.
*/
public class TestDatanodeQueueMetrics {

private MiniOzoneHAClusterImpl cluster = null;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
private String omServiceId;
private static int numOfOMs = 3;
private String scmServiceId;
private static int numOfSCMs = 3;

private static final Logger LOG = LoggerFactory
.getLogger(TestDatanodeQueueMetrics.class);

@Rule
public Timeout timeout = new Timeout(300_000);

/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@BeforeEach
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test1";
scmServiceId = "scm-service-test1";
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
.setSCMServiceId(scmServiceId)
.setNumOfStorageContainerManagers(numOfSCMs)
.setNumOfOzoneManagers(numOfOMs)
.setNumDatanodes(1)
.build();
cluster.waitForClusterToBeReady();
}
/**
* Set a timeout for each test.
*/

@Test
public void testQueueMetrics() {

for (SCMCommandProto.Type type: SCMCommandProto.Type.values()) {
Assertions.assertEquals(0,
getGauge(STATE_CONTEXT_COMMAND_QUEUE_PREFIX +
WordUtils.capitalize(String.valueOf(type))));
Assertions.assertEquals(0,
getGauge(COMMAND_DISPATCHER_QUEUE_PREFIX +
WordUtils.capitalize(String.valueOf(type))));
}

}

private long getGauge(String metricName) {
return getLongGauge(metricName,
getMetrics(DatanodeQueueMetrics.METRICS_SOURCE_NAME));
}
}