Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ public final class OzoneConfigKeys {
public static final int OZONE_CLIENT_EC_GRPC_RETRIES_MAX_DEFAULT = 3;
public static final String OZONE_GPRC_METRICS_PERCENTILES_INTERVALS_KEY
= "ozone.grpc.metrics.percentiles.intervals";
public static final String
OZONE_PROTOCOL_MESSAGE_METRICS_PERCENTILES_INTERVALS
= "ozone.protocol.message.metrics.percentiles.intervals";

/**
* Ozone administrator users delimited by comma.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
: new NoopTokenVerifier();

protocolMetrics =
new ProtocolMessageMetrics<>(
ProtocolMessageMetrics.create(
"HddsDispatcher",
"HDDS dispatcher metrics",
Type.values());
Type.values(),
conf);

this.dispatcher =
new OzoneProtocolMessageDispatcher<>("DatanodeClient",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,82 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.ratis.util.UncheckedAutoCloseable;

/**
* Metrics to count all the subtypes of a specific message.
*/
public class ProtocolMessageMetrics<KEY> implements MetricsSource {
public final class ProtocolMessageMetrics<KEY> implements MetricsSource {

private final String name;

private final String description;

private final boolean quantileEnable;

private final Map<KEY, AtomicLong> counters =
new ConcurrentHashMap<>();

private final Map<KEY, AtomicLong> elapsedTimes =
new ConcurrentHashMap<>();

private final Map<KEY, MutableQuantiles[]> quantiles =
new ConcurrentHashMap<>();

private final AtomicInteger concurrency = new AtomicInteger(0);

public static <KEY> ProtocolMessageMetrics<KEY> create(String name,
String description, KEY[] types) {
return new ProtocolMessageMetrics<KEY>(name, description, types);
String description, KEY[] types, ConfigurationSource conf) {
return new ProtocolMessageMetrics<KEY>(name, description, types, conf);
}

public ProtocolMessageMetrics(String name, String description,
KEY[] values) {
private ProtocolMessageMetrics(String name, String description,
KEY[] values, ConfigurationSource conf) {
this.name = name;
this.description = description;
int[] intervals = conf.getInts(
OzoneConfigKeys.OZONE_PROTOCOL_MESSAGE_METRICS_PERCENTILES_INTERVALS);
quantileEnable = (intervals.length > 0);
for (KEY value : values) {
counters.put(value, new AtomicLong(0));
elapsedTimes.put(value, new AtomicLong(0));
if (quantileEnable) {
MetricsRegistry registry =
new MetricsRegistry(value.toString() + "MessageMetrics");
MutableQuantiles[] mutableQuantiles =
new MutableQuantiles[intervals.length];
quantiles.put(value, mutableQuantiles);
for (int i = 0; i < intervals.length; i++) {
mutableQuantiles[i] = registry.newQuantiles(
intervals[i] + "s",
value.toString() + "rpc time in milli second",
"ops", "latencyMs", intervals[i]);
}
}
}
}

public void increment(KEY key, long duration) {
counters.get(key).incrementAndGet();
elapsedTimes.get(key).addAndGet(duration);
if (quantileEnable) {
MutableQuantiles[] mutableQuantiles = quantiles.get(key);
for (MutableQuantiles q : mutableQuantiles) {
q.add(duration);
}
quantiles.put(key, mutableQuantiles);
}
}

public UncheckedAutoCloseable measure(KEY key) {
Expand All @@ -74,7 +106,15 @@ public UncheckedAutoCloseable measure(KEY key) {
return () -> {
concurrency.decrementAndGet();
counters.get(key).incrementAndGet();
elapsedTimes.get(key).addAndGet(System.currentTimeMillis() - startTime);
long delta = System.currentTimeMillis() - startTime;
elapsedTimes.get(key).addAndGet(delta);
if (quantileEnable) {
MutableQuantiles[] mutableQuantiles = quantiles.get(key);
for (MutableQuantiles q : mutableQuantiles) {
q.add(delta);
}
quantiles.put(key, mutableQuantiles);
}
};
}

Expand All @@ -90,17 +130,20 @@ public void unregister() {
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
counters.forEach((key, value) -> {
MetricsRecordBuilder builder =
collector.addRecord(name);
MetricsRecordBuilder builder = collector.addRecord(name);
builder.add(
new MetricsTag(Interns.info("type", "Message type"), key.toString()));
builder.addCounter(new MetricName("counter", "Number of distinct calls"),
value.longValue());
builder.addCounter(
new MetricName("time", "Sum of the duration of the calls"),
elapsedTimes.get(key).longValue());
if (quantileEnable) {
for (MutableQuantiles mutableQuantiles : quantiles.get(key)) {
mutableQuantiles.snapshot(builder, all);
}
}
builder.endRecord();

});
MetricsRecordBuilder builder = collector.addRecord(name);
builder.addCounter(new MetricName("concurrency",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public SCMBlockProtocolServer(OzoneConfiguration conf,
ProtocolMessageMetrics.create(
"ScmBlockLocationProtocol",
"SCM Block location protocol counters",
ScmBlockLocationProtocolProtos.Type.values());
ScmBlockLocationProtocolProtos.Type.values(),
conf);

// SCM Block Service RPC.
BlockingService blockProtoPbService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public SCMClientProtocolServer(OzoneConfiguration conf,
protocolMetrics = ProtocolMessageMetrics
.create("ScmContainerLocationProtocol",
"SCM ContainerLocation protocol metrics",
StorageContainerLocationProtocolProtos.Type.values());
StorageContainerLocationProtocolProtos.Type.values(),
conf);

// SCM Container Service RPC
BlockingService storageProtoPbService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -156,7 +157,7 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
InetSocketAddress datanodeRpcAddr = getDataNodeBindAddress(
conf, scm.getScmNodeDetails());

protocolMessageMetrics = getProtocolMessageMetrics();
protocolMessageMetrics = getProtocolMessageMetrics(conf);

final int handlerCount = conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
Expand Down Expand Up @@ -469,10 +470,10 @@ private static String flatten(String input) {
* @return ProtocolMessageMetrics
*/
protected ProtocolMessageMetrics<ProtocolMessageEnum>
getProtocolMessageMetrics() {
getProtocolMessageMetrics(ConfigurationSource conf) {
return ProtocolMessageMetrics
.create("SCMDatanodeProtocol", "SCM Datanode protocol",
StorageContainerDatanodeProtocolProtos.Type.values());
StorageContainerDatanodeProtocolProtos.Type.values(), conf);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,14 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol,
// SCM security service RPC service.
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
metrics = new ProtocolMessageMetrics("ScmSecurityProtocol",
metrics = ProtocolMessageMetrics.create("ScmSecurityProtocol",
"SCM Security protocol metrics",
SCMSecurityProtocolProtos.Type.values());
secretKeyMetrics = new ProtocolMessageMetrics("ScmSecretKeyProtocol",
SCMSecurityProtocolProtos.Type.values(),
conf);
secretKeyMetrics = ProtocolMessageMetrics.create("ScmSecretKeyProtocol",
"SCM SecretKey protocol metrics",
SCMSecretKeyProtocolProtos.Type.values());
SCMSecretKeyProtocolProtos.Type.values(),
conf);
BlockingService secureProtoPbService =
SCMSecurityProtocolProtos.SCMSecurityProtocolService
.newReflectiveBlockingService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void addPropertiesNotInXml() {
OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_TIMEOUT,
OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT,
OzoneConfigKeys.OZONE_GPRC_METRICS_PERCENTILES_INTERVALS_KEY,
OzoneConfigKeys.OZONE_PROTOCOL_MESSAGE_METRICS_PERCENTILES_INTERVALS,
ReconConfigKeys.RECON_SCM_CONFIG_PREFIX,
ReconConfigKeys.OZONE_RECON_ADDRESS_KEY,
ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)

omClientProtocolMetrics = ProtocolMessageMetrics
.create("OmClientProtocol", "Ozone Manager RPC endpoint",
OzoneManagerProtocolProtos.Type.values());
OzoneManagerProtocolProtos.Type.values(), conf);

// Start Om Rpc Server.
omRpcServer = getRpcServer(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
Expand Down Expand Up @@ -51,10 +52,10 @@ public ReconDatanodeProtocolServer(OzoneConfiguration conf,

@Override
public ProtocolMessageMetrics<ProtocolMessageEnum>
getProtocolMessageMetrics() {
getProtocolMessageMetrics(ConfigurationSource conf) {
return ProtocolMessageMetrics
.create("ReconDatanodeProtocol", "Recon Datanode protocol",
StorageContainerDatanodeProtocolProtos.Type.values());
StorageContainerDatanodeProtocolProtos.Type.values(), conf);
}

@Override
Expand Down