Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics =
new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
} else {
this.metrics = null;
}
this.metaCache = new MetaCache(this.metrics);

boolean shouldListen =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Expand All @@ -333,6 +326,15 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
}
retrieveClusterId();

if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics =
new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
} else {
this.metrics = null;
}
this.metaCache = new MetaCache(this.metrics);

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);

// Do we publish the status?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
Expand All @@ -58,6 +59,34 @@ public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";

/**
* Set to specify a custom scope for the metrics published through {@link MetricsConnection}.
* The scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
* clusterId and hashCode. For example, a default value for a connection to cluster "foo" might
* be "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl.
* Users may set this key to give a more contextual name for this scope. For example, one might
* want to differentiate a read connection from a write connection by setting the scopes to
* "foo-read" and "foo-write" respectively.
*
* Scope is the only thing that lends any uniqueness to the metrics. Care should be taken to
* avoid using the same scope for multiple Connections, otherwise the metrics may aggregate in
* unforeseen ways.
*/
public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";

/**
* Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY}
* or by generating a default from the passed clusterId and connectionObj's hashCode.
* @param conf configuration for the connection
* @param clusterId clusterId for the connection
* @param connectionObj either a Connection or AsyncConnectionImpl, the instance
* creating this MetricsConnection.
*/
static String getScope(Configuration conf, String clusterId, Object connectionObj) {
return conf.get(METRICS_SCOPE_KEY,
clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
}

private static final String CNT_BASE = "rpcCount_";
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
Expand Down Expand Up @@ -252,7 +281,7 @@ private static interface NewMetric<T> {

private final MetricRegistry registry;
private final JmxReporter reporter;
private final String scope;
protected final String scope;

private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override public Timer newMetric(Class<?> clazz, String name, String scope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand All @@ -35,9 +39,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
Expand Down Expand Up @@ -67,6 +70,49 @@ public static void afterClass() {
METRICS.shutdown();
}

@Test
public void testMetricsConnectionScopeAsyncClient() throws IOException {
Configuration conf = new Configuration();
String clusterId = "foo";
String scope = "testScope";
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);

AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope);
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());

metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());
assertEquals(scope, metrics.get().scope);
}

@Test
public void testMetricsConnectionScopeBlockingClient() throws IOException {
Configuration conf = new Configuration();
String clusterId = "foo";
String scope = "testScope";
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);

ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class);
Mockito.when(mockRegistry.getClusterId())
.thenReturn(CompletableFuture.completedFuture(clusterId));

ConnectionImplementation impl = new ConnectionImplementation(conf, null,
User.getCurrent(), mockRegistry);
MetricsConnection metrics = impl.getConnectionMetrics();
assertNotNull("Metrics should be present", metrics);
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.scope);
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);

metrics = impl.getConnectionMetrics();
assertNotNull("Metrics should be present", metrics);
assertEquals(scope, metrics.scope);
}

@Test
public void testStaticMetrics() throws IOException {
final byte[] foo = Bytes.toBytes("foo");
Expand Down