Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,10 @@ public void onMessage(
getSCMDeletedBlockTransactionStatusManager()
.commitTransactions(ackProto.getResultsList(), dnId);
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
metrics.incrBlockDeletionCommandFailure();
metrics.incrDNCommandsFailure(dnId, 1);
} else {
LOG.debug("Delete Block Command {} is not executed on the Datanode" +
" {}.", commandStatus.getCmdId(), dnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public EmptyTaskResult call() throws Exception {
new CommandForDatanode<>(dnId, command));
metrics.incrBlockDeletionCommandSent();
metrics.incrBlockDeletionTransactionSent(dnTXs.size());
metrics.incrDNCommandsSent(dnId, 1);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added delete block command for datanode {} in the queue,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,36 @@

package org.apache.hadoop.hdds.scm.block;

import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.Interns;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* Metrics related to Block Deleting Service running in SCM.
*/
@Metrics(name = "ScmBlockDeletingService Metrics", about = "Metrics related to "
+ "background block deleting service in SCM", context = "SCM")
public final class ScmBlockDeletingServiceMetrics {
public final class ScmBlockDeletingServiceMetrics implements MetricsSource {

private static ScmBlockDeletingServiceMetrics instance;
public static final String SOURCE_NAME =
SCMBlockDeletingService.class.getSimpleName();
private final MetricsRegistry registry;

/**
* Given all commands are finished and no new coming deletes from OM.
Expand Down Expand Up @@ -86,10 +99,13 @@ public final class ScmBlockDeletingServiceMetrics {
@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;

private final Map<UUID, DatanodeCommandCounts> numCommandsDatanode = new ConcurrentHashMap<>();

private ScmBlockDeletingServiceMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify if registry can be removed ... seems no usages here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove it we get this error:

org.apache.hadoop.metrics2.MetricsException: Hybrid metrics: registry required.
	at org.apache.hadoop.metrics2.lib.MetricsSourceBuilder.build(MetricsSourceBuilder.java:79)
	at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register(MetricsSystemImpl.java:224)
	at org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics.create(ScmBlockDeletingServiceMetrics.java:109)
	...

So we require it.

}

public static ScmBlockDeletingServiceMetrics create() {
public static synchronized ScmBlockDeletingServiceMetrics create() {
if (instance == null) {
MetricsSystem ms = DefaultMetricsSystem.instance();
instance = ms.register(SOURCE_NAME, "SCMBlockDeletingService",
Expand Down Expand Up @@ -152,6 +168,19 @@ public void setNumBlockDeletionTransactionDataNodes(long dataNodes) {
this.numBlockDeletionTransactionDataNodes.set(dataNodes);
}

public void incrDNCommandsSent(UUID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSent(delta);
}
public void incrDNCommandsSuccess(UUID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsSuccess(delta);
}
public void incrDNCommandsFailure(UUID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
.incrCommandsFailure(delta);
}

public long getNumBlockDeletionCommandSent() {
return numBlockDeletionCommandSent.value();
}
Expand Down Expand Up @@ -196,6 +225,112 @@ public long getNumBlockDeletionTransactionDataNodes() {
return numBlockDeletionTransactionDataNodes.value();
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder builder = metricsCollector.addRecord(SOURCE_NAME);
numBlockDeletionCommandSent.snapshot(builder, all);
numBlockDeletionCommandSuccess.snapshot(builder, all);
numBlockDeletionCommandFailure.snapshot(builder, all);
numBlockDeletionTransactionSent.snapshot(builder, all);
numBlockDeletionTransactionSuccess.snapshot(builder, all);
numBlockDeletionTransactionFailure.snapshot(builder, all);
numBlockDeletionTransactionCompleted.snapshot(builder, all);
numBlockDeletionTransactionCreated.snapshot(builder, all);
numSkippedTransactions.snapshot(builder, all);
numProcessedTransactions.snapshot(builder, all);
numBlockDeletionTransactionDataNodes.snapshot(builder, all);

MetricsRecordBuilder recordBuilder = builder;
for (Map.Entry<UUID, DatanodeCommandCounts> e : numCommandsDatanode.entrySet()) {
recordBuilder = recordBuilder.endRecord().addRecord(SOURCE_NAME)
.add(new MetricsTag(Interns.info("datanode",
"Datanode host for deletion commands"), e.getKey().toString()))
.addGauge(DatanodeCommandCounts.COMMANDS_SENT_TO_DN,
e.getValue().getCommandsSent())
.addGauge(DatanodeCommandCounts.COMMANDS_SUCCESSFUL_EXECUTION_BY_DN,
e.getValue().getCommandsSuccess())
.addGauge(DatanodeCommandCounts.COMMANDS_FAILED_EXECUTION_BY_DN,
e.getValue().getCommandsFailure());
}
recordBuilder.endRecord();
}

/**
* Class contains metrics related to the ScmBlockDeletingService for each datanode.
*/
public static final class DatanodeCommandCounts {
private long commandsSent;
private long commandsSuccess;
private long commandsFailure;

private static final MetricsInfo COMMANDS_SENT_TO_DN = Interns.info(
"CommandsSent",
"Number of commands sent from SCM to the datanode for deletion");
private static final MetricsInfo COMMANDS_SUCCESSFUL_EXECUTION_BY_DN = Interns.info(
"CommandsSuccess",
"Number of commands sent from SCM to the datanode for deletion for which execution succeeded.");
private static final MetricsInfo COMMANDS_FAILED_EXECUTION_BY_DN = Interns.info(
"CommandsFailed",
"Number of commands sent from SCM to the datanode for deletion for which execution failed.");

public DatanodeCommandCounts() {
this.commandsSent = 0;
this.commandsSuccess = 0;
this.commandsFailure = 0;
}

public void incrCommandsSent(long delta) {
this.commandsSent += delta;
}

public void incrCommandsSuccess(long delta) {
this.commandsSuccess += delta;
}

public void incrCommandsFailure(long delta) {
this.commandsFailure += delta;
}

public long getCommandsSent() {
return commandsSent;
}

public long getCommandsSuccess() {
return commandsSuccess;
}

public long getCommandsFailure() {
return commandsFailure;
}

@Override
public String toString() {
return "Sent=" + commandsSent + ", Success=" + commandsSuccess + ", Failed=" + commandsFailure;
}
}

public long getNumCommandsDatanodeSent() {
long sent = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
sent += v.commandsSent;
}
return sent;
}
public long getNumCommandsDatanodeSuccess() {
long successCount = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
successCount += v.commandsSuccess;
}
return successCount;
}
public long getNumCommandsDatanodeFailed() {
long failCount = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
failCount += v.commandsFailure;
}
return failCount;
}

@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
Expand All @@ -214,7 +349,9 @@ public String toString() {
.append("numBlockDeletionTransactionSuccess = "
+ numBlockDeletionTransactionSuccess.value()).append("\t")
.append("numBlockDeletionTransactionFailure = "
+ numBlockDeletionTransactionFailure.value());
+ numBlockDeletionTransactionFailure.value()).append("\t")
.append("numDeletionCommandsPerDatanode = "
+ numCommandsDatanode);
return buffer.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ public void testBlockDeletion(ReplicationConfig repConfig) throws Exception {

assertEquals(metrics.getNumBlockDeletionTransactionCreated(),
metrics.getNumBlockDeletionTransactionCompleted());
assertEquals(metrics.getNumBlockDeletionCommandSent(), metrics.getNumCommandsDatanodeSent());
assertEquals(metrics.getNumBlockDeletionCommandSuccess(), metrics.getNumCommandsDatanodeSuccess());
assertEquals(metrics.getBNumBlockDeletionCommandFailure(), metrics.getNumCommandsDatanodeFailed());
assertThat(metrics.getNumBlockDeletionCommandSent())
.isGreaterThanOrEqualTo(metrics.getNumBlockDeletionCommandSuccess() +
metrics.getBNumBlockDeletionCommandFailure());
Expand Down
Loading