Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class AsyncConnectionImpl implements AsyncConnection {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final String metricsScope;
private final Optional<MetricsConnection> metrics;

private final ClusterStatusListener clusterStatusListener;
Expand All @@ -128,15 +129,16 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);

if (user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
this.metrics =
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
Expand Down Expand Up @@ -235,7 +237,9 @@ public void close() {
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
if (metrics.isPresent()) {
MetricsConnection.deleteMetricsConnection(metricsScope);
}
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -47,13 +51,46 @@
/**
* This class is for maintaining the various connection statistics and publishing them through the
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
* this class implicitly creates and "starts" instances of these classes; be sure to call
* {@link #shutdown()} to terminate the thread pools they allocate.
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
* {@link #getMetricsConnection()} implicitly creates and "starts" instances of these classes; be
* sure to call {@link #deleteMetricsConnection()} to terminate the thread pools they allocate. The
* metrics reporter will be shutdown {@link #shutdown()} when all connections within this metrics
* instances are closed.
*/
@InterfaceAudience.Private
public class MetricsConnection implements StatisticTrackable {

static final Map<String, MetricsConnection> METRICS_INSTANCES =
new HashMap<String, MetricsConnection>();

static MetricsConnection getMetricsConnection(final String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
MetricsConnection metrics;
synchronized (METRICS_INSTANCES) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a synchronized or concurrent map type instead.

Copy link
Contributor Author

@vli02 vli02 Nov 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if a synchronized or concurrent map can protect the entire block, I want this entire block to run in single thread mode. Especially in the deletion method below, the decrementing count, getting count, and remove it from the map have to be single threaded. @apurtell @d-c-manning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scope here should be unique per async connection object, is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the scope that you meant here is the scope of this code block, it is per async connection object, and the single metrics object which might be shared among multiple async connection objects. Please refer the comment in the deleteMetricsConnection() code block, too.

metrics = METRICS_INSTANCES.get(scope);
if (metrics == null) {
metrics = new MetricsConnection(scope, batchPool, metaPool);
METRICS_INSTANCES.put(scope, metrics);
} else {
metrics.addThreadPools(batchPool, metaPool);
}
metrics.incrConnectionCount();
}
return metrics;
}

static void deleteMetricsConnection(final String scope) {
synchronized (METRICS_INSTANCES) {
MetricsConnection metrics = METRICS_INSTANCES.get(scope);
if (metrics == null) return;
metrics.decrConnectionCount();
if (metrics.getConnectionCount() == 0) {
METRICS_INSTANCES.remove(scope);
metrics.shutdown();
}
}
}

/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";

Expand Down Expand Up @@ -295,8 +332,13 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
}
};

// List of thread pool per connection of the metrics.
private List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
private List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();

// static metrics

protected final Counter connectionCount;
protected final Counter metaCacheHits;
protected final Counter metaCacheMisses;
protected final CallTracker getTracker;
Expand Down Expand Up @@ -331,30 +373,52 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
protected final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = batchPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among batch pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.registry.register(getMetaPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = metaPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among meta lookup pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
this.metaCacheNumClearServer =
Expand Down Expand Up @@ -396,10 +460,6 @@ MetricRegistry getMetricRegistry() {
return registry;
}

public void shutdown() {
this.reporter.stop();
}

/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
public static CallStats newCallStats() {
// TODO: instance pool to reduce GC?
Expand Down Expand Up @@ -457,6 +517,27 @@ public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}

/** Return the connection count of the metrics within a scope */
public long getConnectionCount() {
return connectionCount.getCount();
}

/** Increment the connection count of the metrics within a scope */
private void incrConnectionCount() {
connectionCount.inc();
}

/** Decrement the connection count of the metrics within a scope */
private void decrConnectionCount() {
connectionCount.dec();
}

/** Add thread pools of additional connections to the metrics */
private void addThreadPools(Supplier batchPool, Supplier metaPool) {
batchPools.add(batchPool);
metaPools.add(metaPool);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand All @@ -474,6 +555,10 @@ private void updateRpcGeneric(String methodName, CallStats stats) {
.update(stats.getResponseSizeBytes());
}

private void shutdown() {
this.reporter.stop();
}

/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
int callsPerServer = stats.getConcurrentCallsPerServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -61,14 +63,16 @@ public class TestMetricsConnection {
private static final ThreadPoolExecutor BATCH_POOL =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);

private static final String MOCK_CONN_STR = "mocked-connection";

@BeforeClass
public static void beforeClass() {
METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
}

@AfterClass
public static void afterClass() {
METRICS.shutdown();
MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
}

@Test
Expand All @@ -90,6 +94,51 @@ public void testMetricsConnectionScope() throws IOException {
assertEquals(scope, metrics.get().scope);
}

@Test
public void testMetricsWithMutiConnections() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");

User user = User.getCurrent();

/* create multiple connections */
final int num = 3;
AsyncConnectionImpl impl;
List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
for (int i = 0; i < num; i++) {
impl = new AsyncConnectionImpl(conf, null, null, null, user);
connList.add(impl);
}

/* verify metrics presence */
impl = connList.get(0);
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());

/* verify connection count in a shared metrics */
long count = metrics.get().getConnectionCount();
assertEquals("Failed to verify connection count." + count, count, num);

/* close some connections */
for (int i = 0; i < num - 1; i++) {
connList.get(i).close();
}

/* verify metrics presence again */
impl = connList.get(num - 1);
metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present after some of connections are closed.",
metrics.isPresent());

/* verify count of remaining connections */
count = metrics.get().getConnectionCount();
assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);

/* shutdown */
impl.close();
}

@Test
public void testStaticMetrics() throws IOException {
final byte[] foo = Bytes.toBytes("foo");
Expand Down