Skip to content

Commit

Permalink
add thread pool metric (#11461)
Browse files Browse the repository at this point in the history
* add thread pool metric,and run successfully

* add more thread pool metric,and run successfully

* delete comment

* avoid start import

* avoid start import and unused import

* avoid check

* ut pass

* Resolve unit test failures
The previous unit test run failed due to the introduction of thread pool metrics,
Unit tests now filter thread pool metrics

* Spin-off the applicationModel from the DefaultMetric Collector

---------

Co-authored-by: dongdong.yang <[email protected]>
  • Loading branch information
Webster-Yang and ddy1012 committed Feb 12, 2023
1 parent 62e96db commit cc32ebe
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface MetricsConstants {

String TAG_IP = "ip";

String TAG_PID = "pid";

String TAG_HOSTNAME = "hostname";

String TAG_APPLICATION_NAME = "application.name";
Expand Down Expand Up @@ -77,4 +79,6 @@ public interface MetricsConstants {
String PROMETHEUS_DEFAULT_JOB_NAME = "default_dubbo_job";

String METRIC_FILTER_START_TIME = "metric_filter_start_time";

String TAG_THREAD_NAME = "thread.pool.name";
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ private void initMetricsReporter() {
if (metricsConfig != null && PROTOCOL_PROMETHEUS.equals(metricsConfig.getProtocol())) {
collector.setCollectEnabled(true);
collector.addApplicationInfo(applicationModel.getApplicationName(), Version.getVersion());
collector.addThreadPool(applicationModel.getFrameworkModel(), applicationModel.getApplicationName());
String protocol = metricsConfig.getProtocol();
if (StringUtils.isNotEmpty(protocol)) {
MetricsReporterFactory metricsReporterFactory = getExtensionLoader(MetricsReporterFactory.class).getAdaptiveExtension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public enum MetricsCategory {
RT,
QPS,
REQUESTS,
THREAD_POOL,
APPLICATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public enum MetricsKey {
PROVIDER_METRIC_RT_P99("dubbo.provider.rt.seconds.p99", "Response Time P99"),
PROVIDER_METRIC_RT_P95("dubbo.provider.rt.seconds.p95", "Response Time P95"),

THREAD_POOL_CORE_SIZE("dubbo.thread.pool.core.size","Thread Pool Core Size"),
THREAD_POOL_LARGEST_SIZE("dubbo.thread.pool.largest.size","Thread Pool Largest Size"),
THREAD_POOL_MAX_SIZE("dubbo.thread.pool.max.size","Thread Pool Max Size"),
THREAD_POOL_ACTIVE_SIZE("dubbo.thread.pool.active.size","Thread Pool Active Size"),
THREAD_POOL_THREAD_COUNT("dubbo.thread.pool.thread.count","Thread Pool Thread Count"),
THREAD_POOL_QUEUE_SIZE("dubbo.thread.pool.queue.size","Thread Pool Queue Size"),

// consumer metrics key
;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.metrics.model;

import org.apache.dubbo.common.utils.ConfigUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;


import static org.apache.dubbo.common.constants.MetricsConstants.TAG_IP;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_PID;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_HOSTNAME;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_APPLICATION_NAME;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_THREAD_NAME;
import static org.apache.dubbo.common.utils.NetUtils.getLocalHost;
import static org.apache.dubbo.common.utils.NetUtils.getLocalHostName;

public class ThreadPoolMetric {

private String applicationName;

private String threadPoolName;

private ThreadPoolExecutor threadPoolExecutor;

public ThreadPoolMetric(String applicationName, String threadPoolName, ThreadPoolExecutor threadPoolExecutor) {
this.applicationName = applicationName;
this.threadPoolExecutor = threadPoolExecutor;
this.threadPoolName = threadPoolName;
}

public String getThreadPoolName() {
return threadPoolName;
}

public void setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
}

public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}

public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}

public String getApplicationName() {
return applicationName;
}

public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPoolMetric that = (ThreadPoolMetric) o;
return Objects.equals(applicationName, that.applicationName) &&
Objects.equals(threadPoolName, that.threadPoolName);
}

@Override
public int hashCode() {
return Objects.hash(applicationName, threadPoolName);
}

public Map<String, String> getTags() {
Map<String, String> tags = new HashMap<>();
tags.put(TAG_IP, getLocalHost());
tags.put(TAG_PID, ConfigUtils.getPid()+"");
tags.put(TAG_HOSTNAME, getLocalHostName());
tags.put(TAG_APPLICATION_NAME, applicationName);

tags.put(TAG_THREAD_NAME, threadPoolName);
return tags;
}

public double getCorePoolSize() {
return threadPoolExecutor.getCorePoolSize();
}

public double getLargestPoolSize() {
return threadPoolExecutor.getLargestPoolSize();
}

public double getMaximumPoolSize() {
return threadPoolExecutor.getMaximumPoolSize();
}

public double getActiveCount() {
return threadPoolExecutor.getActiveCount();
}

public double getPoolSize(){
return threadPoolExecutor.getPoolSize();
}

public double getQueueSize(){
return threadPoolExecutor.getQueue().size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@

package org.apache.dubbo.metrics.collector;

import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.metrics.collector.stat.MetricsStatComposite;
import org.apache.dubbo.metrics.collector.stat.MetricsStatHandler;
import org.apache.dubbo.metrics.event.EmptyEvent;
import org.apache.dubbo.metrics.event.MetricsEvent;
import org.apache.dubbo.metrics.event.SimpleMetricsEventMulticaster;
import org.apache.dubbo.metrics.listener.MetricsListener;
import org.apache.dubbo.metrics.model.MetricsKey;
import org.apache.dubbo.metrics.model.ThreadPoolMetric;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.Invocation;

import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static org.apache.dubbo.metrics.model.MetricsCategory.THREAD_POOL;
import static org.apache.dubbo.metrics.model.MetricsCategory.APPLICATION;
import static org.apache.dubbo.metrics.model.MetricsCategory.REQUESTS;
import static org.apache.dubbo.metrics.model.MetricsCategory.RT;
Expand All @@ -46,11 +55,12 @@
public class DefaultMetricsCollector implements MetricsCollector {

private AtomicBoolean collectEnabled = new AtomicBoolean(false);
private final Set<ThreadPoolMetric> threadPoolMetricSet = new HashSet<ThreadPoolMetric>();
private final MetricsStatComposite stats;
private final SimpleMetricsEventMulticaster eventMulticaster;

public DefaultMetricsCollector() {
this.stats = new MetricsStatComposite(this);
this.stats = new MetricsStatComposite( this);
this.eventMulticaster = SimpleMetricsEventMulticaster.getInstance();
}

Expand Down Expand Up @@ -112,22 +122,46 @@ public void addRT(String applicationName,Invocation invocation, Long responseTim
public void addApplicationInfo(String applicationName, String version) {
doExecute(MetricsEvent.Type.APPLICATION_INFO, statHandler -> statHandler.addApplication(applicationName,version));
}

public void addThreadPool(FrameworkModel frameworkModel, String applicationName) {
FrameworkExecutorRepository frameworkExecutorRepository =
frameworkModel.getBeanFactory().getBean(FrameworkExecutorRepository.class);
addThreadPoolExecutor(applicationName, "SharedExecutor", frameworkExecutorRepository.getSharedExecutor());
addThreadPoolExecutor(applicationName, "MappingRefreshingExecutor", frameworkExecutorRepository.getMappingRefreshingExecutor());
addThreadPoolExecutor(applicationName, "PoolRouterExecutor", frameworkExecutorRepository.getPoolRouterExecutor());
}

private void addThreadPoolExecutor(String applicationName, String threadPoolName, ExecutorService executorService) {
Optional<ExecutorService> executorOptional = Optional.ofNullable(executorService);
if (executorOptional.isPresent() && executorOptional.get() instanceof ThreadPoolExecutor ) {
threadPoolMetricSet.add(new ThreadPoolMetric(applicationName, threadPoolName,
(ThreadPoolExecutor) executorOptional.get()));
}
}

@Override
public List<MetricSample> collect() {
List<MetricSample> list = new ArrayList<>();
collectApplication(list);
collectRequests(list);
collectRT(list);

collectThreadPool(list);
return list;
}

private void collectThreadPool(List<MetricSample> list) {
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_CORE_SIZE, e.getTags(), THREAD_POOL, e::getCorePoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_LARGEST_SIZE, e.getTags(), THREAD_POOL, e::getLargestPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_MAX_SIZE, e.getTags(), THREAD_POOL, e::getMaximumPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_ACTIVE_SIZE, e.getTags(), THREAD_POOL, e::getActiveCount)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_THREAD_COUNT, e.getTags(), THREAD_POOL, e::getPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_QUEUE_SIZE, e.getTags(), THREAD_POOL, e::getQueueSize)));
}

private void collectApplication(List<MetricSample> list) {
doCollect(MetricsEvent.Type.APPLICATION_INFO, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.APPLICATION_METRIC_INFO, k.getTags(),
APPLICATION, v::get))));


}

private void collectRequests(List<MetricSample> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.dubbo.metrics.filter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.dubbo.common.constants.MetricsConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.model.MetricsKey;
Expand Down Expand Up @@ -322,6 +325,13 @@ private void initParam() {

private Map<String, MetricSample> getMetricsMap() {
List<MetricSample> samples = collector.collect();
return samples.stream().collect(Collectors.toMap(MetricSample::getName, Function.identity()));
List<MetricSample> samples1 = new ArrayList<>();
for (MetricSample sample : samples) {
if (sample.getName().contains("dubbo.thread.pool")) {
continue;
}
samples1.add(sample);
}
return samples1.stream().collect(Collectors.toMap(MetricSample::getName, Function.identity()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -55,6 +56,7 @@ class DefaultMetricsCollectorTest {
private String group;
private String version;
private RpcInvocation invocation;
public static final String DUBBO_THREAD_METRIC_MARK = "dubbo.thread.pool";

@BeforeEach
public void setup() {
Expand Down Expand Up @@ -93,6 +95,9 @@ void testRequestsMetrics() {

List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
continue;
}
Assertions.assertTrue(sample instanceof GaugeMetricSample);
GaugeMetricSample gaugeSample = (GaugeMetricSample) sample;
Map<String, String> tags = gaugeSample.getTags();
Expand All @@ -107,7 +112,14 @@ void testRequestsMetrics() {

collector.decreaseProcessingRequests(applicationName, invocation);
samples = collector.collect();
Map<String, Long> sampleMap = samples.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
List<MetricSample> samples1 = new ArrayList<>();
for (MetricSample sample : samples) {
if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
continue;
}
samples1.add(sample);
}
Map<String, Long> sampleMap = samples1.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
Number number = ((GaugeMetricSample) k).getSupplier().get();
return number.longValue();
}));
Expand All @@ -125,15 +137,24 @@ void testRTMetrics() {

List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
continue;
}
Map<String, String> tags = sample.getTags();

Assertions.assertEquals(tags.get(TAG_INTERFACE_KEY), interfaceName);
Assertions.assertEquals(tags.get(TAG_METHOD_KEY), methodName);
Assertions.assertEquals(tags.get(TAG_GROUP_KEY), group);
Assertions.assertEquals(tags.get(TAG_VERSION_KEY), version);
}

Map<String, Long> sampleMap = samples.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
List<MetricSample> samples1 = new ArrayList<>();
for (MetricSample sample : samples) {
if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
continue;
}
samples1.add(sample);
}
Map<String, Long> sampleMap = samples1.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
Number number = ((GaugeMetricSample) k).getSupplier().get();
return number.longValue();
}));
Expand Down

0 comments on commit cc32ebe

Please sign in to comment.