Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ private void processCmd(DeleteCmdInfo cmd) {
ContainerBlocksDeletionACKProto blockDeletionACK = null;
long startTime = Time.monotonicNow();
boolean cmdExecuted = false;
int successCount = 0, failedCount = 0;
try {
// move blocks to deleting state.
// this is a metadata update, the actual deletion happens in another
Expand Down Expand Up @@ -381,7 +382,6 @@ private void processCmd(DeleteCmdInfo cmd) {
// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
int successCount = 0, failedCount = 0;

for (DeleteBlockTransactionResult result : blockDeletionACK.getResultsList()) {
boolean success = result.getSuccess();
Expand All @@ -399,8 +399,6 @@ private void processCmd(DeleteCmdInfo cmd) {
blockDeleteMetrics.incrProcessedTransactionFailCount(1);
}
}
LOG.info("Sending deletion ACK to SCM, successTransactionCount = {}," +
"failedTransactionCount= {}", successCount, failedCount);
}
cmdExecuted = true;
} finally {
Expand All @@ -418,6 +416,8 @@ private void processCmd(DeleteCmdInfo cmd) {
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
long endTime = Time.monotonicNow();
this.opsLatencyMs.add(endTime - startTime);
LOG.info("Sending deletion ACK to SCM, successTransactionCount = {}," +
"failedTransactionCount = {}, time elapsed = {}", successCount, failedCount, opsLatencyMs);
invocationCount++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,18 @@ private void getTransaction(DeletedBlocksTransaction tx,
DeletedBlocksTransaction.newBuilder(tx)
.setCount(transactionStatusManager.getRetryCount(tx.getTxID()))
.build();
boolean flag = false;
for (ContainerReplica replica : replicas) {
final DatanodeID datanodeID = replica.getDatanodeDetails().getID();
if (!transactionStatusManager.isDuplication(
datanodeID, tx.getTxID(), commandStatus)) {
transactions.addTransactionToDN(datanodeID, updatedTxn);
metrics.incrProcessedTransaction();
flag = true;
}
}
if (flag) {
metrics.incrProcessedTransaction();
}
}

private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,26 @@ public EmptyTaskResult call() throws Exception {
for (Map.Entry<DatanodeID, List<DeletedBlocksTransaction>> entry :
transactions.getDatanodeTransactionMap().entrySet()) {
DatanodeID dnId = entry.getKey();
long blocksToDn = 0;
List<DeletedBlocksTransaction> dnTXs = entry.getValue();
if (!dnTXs.isEmpty()) {
Set<Long> dnTxSet = dnTXs.stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toSet());
for (DeletedBlocksTransaction deletedBlocksTransaction : dnTXs) {
blocksToDn +=
deletedBlocksTransaction.getLocalIDList().size();
}
processedTxIDs.addAll(dnTxSet);
DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs);
command.setTerm(scmContext.getTermOfLeader());
deletedBlockLog.recordTransactionCreated(dnId, command.getId(),
dnTxSet);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dnId, command));
metrics.incrNumBlockDeletionSentDN(dnId, blocksToDn);
metrics.incrBlockDeletionCommandSent();
metrics.incrBlockDeletionTransactionSent(dnTXs.size());
metrics.incrBlockDeletionTransactionsOnDatanodes(dnTXs.size());
metrics.incrDNCommandsSent(dnId, 1);
if (LOG.isDebugEnabled()) {
LOG.debug(
Expand All @@ -211,6 +217,7 @@ public EmptyTaskResult call() throws Exception {
}
}
}
metrics.incrTotalBlockSentToDNForDeletion(transactions.getBlocksDeleted());
LOG.info("Totally added {} blocks to be deleted for"
+ " {} datanodes / {} totalnodes, limit per iteration : {}blocks, task elapsed time: {}ms",
transactions.getBlocksDeleted(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,11 @@ public void commitTransactions(List<DeleteBlockTransactionResult> transactionRes
for (DeleteBlockTransactionResult transactionResult :
transactionResults) {
if (isTransactionFailed(transactionResult)) {
metrics.incrBlockDeletionTransactionFailure();
metrics.incrBlockDeletionTransactionFailureOnDatanodes();
continue;
}
try {
metrics.incrBlockDeletionTransactionSuccess();
metrics.incrBlockDeletionTransactionSuccessOnDatanodes();
long txID = transactionResult.getTxID();
// set of dns which have successfully committed transaction txId.
final Set<DatanodeID> dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public final class ScmBlockDeletingServiceMetrics implements MetricsSource {

@Metric(about = "The number of individual delete transactions sent to " +
"all DN.")
private MutableCounterLong numBlockDeletionTransactionSent;
private MutableCounterLong numBlockDeletionTransactionsOnDatanodes;

@Metric(about = "The number of success execution of delete transactions.")
private MutableCounterLong numBlockDeletionTransactionSuccess;
private MutableCounterLong numBlockDeletionTransactionSuccessOnDatanodes;

@Metric(about = "The number of failure execution of delete transactions.")
private MutableCounterLong numBlockDeletionTransactionFailure;
private MutableCounterLong numBlockDeletionTransactionFailureOnDatanodes;

@Metric(about = "The number of completed txs which are removed from DB.")
private MutableCounterLong numBlockDeletionTransactionCompleted;
Expand All @@ -95,7 +95,10 @@ public final class ScmBlockDeletingServiceMetrics implements MetricsSource {
@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;

private final Map<DatanodeID, DatanodeCommandCounts> numCommandsDatanode = new ConcurrentHashMap<>();
@Metric(about = "Total blocks sent to DN for deletion.")
private MutableGaugeLong numBlockAddedForDeletionToDN;

private final Map<DatanodeID, DatanodeCommandDetails> numCommandsDatanode = new ConcurrentHashMap<>();

private ScmBlockDeletingServiceMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME);
Expand Down Expand Up @@ -132,16 +135,16 @@ public void incrBlockDeletionCommandFailure() {
this.numBlockDeletionCommandFailure.incr();
}

public void incrBlockDeletionTransactionSent(long count) {
this.numBlockDeletionTransactionSent.incr(count);
public void incrBlockDeletionTransactionsOnDatanodes(long count) {
this.numBlockDeletionTransactionsOnDatanodes.incr(count);
}

public void incrBlockDeletionTransactionFailure() {
this.numBlockDeletionTransactionFailure.incr();
public void incrBlockDeletionTransactionFailureOnDatanodes() {
this.numBlockDeletionTransactionFailureOnDatanodes.incr();
}

public void incrBlockDeletionTransactionSuccess() {
this.numBlockDeletionTransactionSuccess.incr();
public void incrBlockDeletionTransactionSuccessOnDatanodes() {
this.numBlockDeletionTransactionSuccessOnDatanodes.incr();
}

public void incrBlockDeletionTransactionCompleted(long count) {
Expand All @@ -165,25 +168,34 @@ public void setNumBlockDeletionTransactionDataNodes(long dataNodes) {
}

public void incrDNCommandsSent(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandDetails())
.incrCommandsSent(delta);
}

public void incrDNCommandsSuccess(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandDetails())
.incrCommandsSuccess(delta);
}

public void incrDNCommandsFailure(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandDetails())
.incrCommandsFailure(delta);
}

public void incrDNCommandsTimeout(DatanodeID id, long delta) {
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandCounts())
numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandDetails())
.incrCommandsTimeout(delta);
}

public void incrNumBlockDeletionSentDN(DatanodeID id, long delta) {
this.numCommandsDatanode.computeIfAbsent(id, k -> new DatanodeCommandDetails())
.incrBlocksSent(delta);
}

public void incrTotalBlockSentToDNForDeletion(long count) {
this.numBlockAddedForDeletionToDN.incr(count);
}

public long getNumBlockDeletionCommandSent() {
return numBlockDeletionCommandSent.value();
}
Expand All @@ -196,16 +208,16 @@ public long getBNumBlockDeletionCommandFailure() {
return numBlockDeletionCommandFailure.value();
}

public long getNumBlockDeletionTransactionSent() {
return numBlockDeletionTransactionSent.value();
public long getNumBlockDeletionTransactionsOnDatanodes() {
return numBlockDeletionTransactionsOnDatanodes.value();
}

public long getNumBlockDeletionTransactionFailure() {
return numBlockDeletionTransactionFailure.value();
public long getNumBlockDeletionTransactionFailureOnDatanodes() {
return numBlockDeletionTransactionFailureOnDatanodes.value();
}

public long getNumBlockDeletionTransactionSuccess() {
return numBlockDeletionTransactionSuccess.value();
public long getNumBlockDeletionTransactionSuccessOnDatanodes() {
return numBlockDeletionTransactionSuccessOnDatanodes.value();
}

public long getNumBlockDeletionTransactionCompleted() {
Expand Down Expand Up @@ -234,40 +246,44 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
numBlockDeletionCommandSent.snapshot(builder, all);
numBlockDeletionCommandSuccess.snapshot(builder, all);
numBlockDeletionCommandFailure.snapshot(builder, all);
numBlockDeletionTransactionSent.snapshot(builder, all);
numBlockDeletionTransactionSuccess.snapshot(builder, all);
numBlockDeletionTransactionFailure.snapshot(builder, all);
numBlockDeletionTransactionsOnDatanodes.snapshot(builder, all);
numBlockDeletionTransactionSuccessOnDatanodes.snapshot(builder, all);
numBlockDeletionTransactionFailureOnDatanodes.snapshot(builder, all);
numBlockDeletionTransactionCompleted.snapshot(builder, all);
numBlockDeletionTransactionCreated.snapshot(builder, all);
numSkippedTransactions.snapshot(builder, all);
numProcessedTransactions.snapshot(builder, all);
numBlockDeletionTransactionDataNodes.snapshot(builder, all);
numBlockAddedForDeletionToDN.snapshot(builder, all);

MetricsRecordBuilder recordBuilder = builder;
for (Map.Entry<DatanodeID, DatanodeCommandCounts> e : numCommandsDatanode.entrySet()) {
for (Map.Entry<DatanodeID, DatanodeCommandDetails> e : numCommandsDatanode.entrySet()) {
recordBuilder = recordBuilder.endRecord().addRecord(SOURCE_NAME)
.add(new MetricsTag(Interns.info("datanode",
"Datanode host for deletion commands"), e.getKey().toString()))
.addGauge(DatanodeCommandCounts.COMMANDS_SENT_TO_DN,
.addGauge(DatanodeCommandDetails.COMMANDS_SENT_TO_DN,
e.getValue().getCommandsSent())
.addGauge(DatanodeCommandCounts.COMMANDS_SUCCESSFUL_EXECUTION_BY_DN,
.addGauge(DatanodeCommandDetails.COMMANDS_SUCCESSFUL_EXECUTION_BY_DN,
e.getValue().getCommandsSuccess())
.addGauge(DatanodeCommandCounts.COMMANDS_FAILED_EXECUTION_BY_DN,
.addGauge(DatanodeCommandDetails.COMMANDS_FAILED_EXECUTION_BY_DN,
e.getValue().getCommandsFailure())
.addGauge(DatanodeCommandCounts.COMMANDS_TIMEOUT_BY_DN,
e.getValue().getCommandsTimeout());
.addGauge(DatanodeCommandDetails.COMMANDS_TIMEOUT_BY_DN,
e.getValue().getCommandsTimeout())
.addGauge(DatanodeCommandDetails.BLOCKS_SENT_TO_DN_COMMAND,
e.getValue().getCommandsTimeout());
}
recordBuilder.endRecord();
}

/**
* Class contains metrics related to the ScmBlockDeletingService for each datanode.
*/
public static final class DatanodeCommandCounts {
public static final class DatanodeCommandDetails {
private long commandsSent;
private long commandsSuccess;
private long commandsFailure;
private long commandsTimeout;
private long blocksSent;

private static final MetricsInfo COMMANDS_SENT_TO_DN = Interns.info(
"CommandsSent",
Expand All @@ -281,14 +297,18 @@ public static final class DatanodeCommandCounts {

private static final MetricsInfo COMMANDS_TIMEOUT_BY_DN = Interns.info(
"CommandsTimeout",
"Number of commands timeout from SCM to DN"
);
"Number of commands timeout from SCM to DN");

private static final MetricsInfo BLOCKS_SENT_TO_DN_COMMAND = Interns.info(
"BlocksSent",
"Number of blocks sent to DN in a command for deletion.");

public DatanodeCommandCounts() {
public DatanodeCommandDetails() {
this.commandsSent = 0;
this.commandsSuccess = 0;
this.commandsFailure = 0;
this.commandsTimeout = 0;
this.blocksSent = 0;
}

public void incrCommandsSent(long delta) {
Expand All @@ -307,6 +327,10 @@ public void incrCommandsTimeout(long delta) {
this.commandsTimeout += delta;
}

public void incrBlocksSent(long delta) {
this.blocksSent += delta;
}

public long getCommandsSent() {
return commandsSent;
}
Expand All @@ -326,29 +350,29 @@ public long getCommandsTimeout() {
@Override
public String toString() {
return "Sent=" + commandsSent + ", Success=" + commandsSuccess + ", Failed=" + commandsFailure +
", Timeout=" + commandsTimeout;
", Timeout=" + commandsTimeout + ", BlocksSent = " + blocksSent;
}
}

public long getNumCommandsDatanodeSent() {
long sent = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
for (DatanodeCommandDetails v : numCommandsDatanode.values()) {
sent += v.commandsSent;
}
return sent;
}

public long getNumCommandsDatanodeSuccess() {
long successCount = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
for (DatanodeCommandDetails v : numCommandsDatanode.values()) {
successCount += v.commandsSuccess;
}
return successCount;
}

public long getNumCommandsDatanodeFailed() {
long failCount = 0;
for (DatanodeCommandCounts v : numCommandsDatanode.values()) {
for (DatanodeCommandDetails v : numCommandsDatanode.values()) {
failCount += v.commandsFailure;
}
return failCount;
Expand All @@ -363,9 +387,14 @@ public String toString() {
.append("numBlockDeletionCommandSent = ").append(numBlockDeletionCommandSent.value()).append('\t')
.append("numBlockDeletionCommandSuccess = ").append(numBlockDeletionCommandSuccess.value()).append('\t')
.append("numBlockDeletionCommandFailure = ").append(numBlockDeletionCommandFailure.value()).append('\t')
.append("numBlockDeletionTransactionSent = ").append(numBlockDeletionTransactionSent.value()).append('\t')
.append("numBlockDeletionTransactionSuccess = ").append(numBlockDeletionTransactionSuccess.value()).append('\t')
.append("numBlockDeletionTransactionFailure = ").append(numBlockDeletionTransactionFailure.value()).append('\t')
.append("numBlockDeletionTransactionsOnDatanodes = ").append(numBlockDeletionTransactionsOnDatanodes.value())
.append('\t')
.append("numBlockDeletionTransactionSuccessOnDatanodes = ")
.append(numBlockDeletionTransactionSuccessOnDatanodes.value()).append('\t')
.append("numBlockDeletionTransactionFailureOnDatanodes = ")
.append(numBlockDeletionTransactionFailureOnDatanodes.value()).append('\t')
.append("numBlockAddedForDeletionToDN = ")
.append(numBlockAddedForDeletionToDN.value()).append('\t')
.append("numDeletionCommandsPerDatanode = ").append(numCommandsDatanode);
return buffer.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testCall() throws Exception {
metrics.getNumBlockDeletionCommandSent());
// Echo Command has one Transaction
assertEquals(datanodeDetails.size() * 1,
metrics.getNumBlockDeletionTransactionSent());
metrics.getNumBlockDeletionTransactionsOnDatanodes());
}

private void callDeletedBlockTransactionScanner() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@
"targets": [
{
"editorMode": "code",
"expr": "(scm_block_deleting_service_num_block_deletion_transaction_success / clamp_min(scm_block_deleting_service_num_block_deletion_transaction_sent, 1)) * 100",
"expr": "(scm_block_deleting_service_num_block_deletion_transaction_success_on_datanodes / clamp_min(scm_block_deleting_service_num_block_deletion_transactions_on_datanodes, 1)) * 100",
"legendFormat": "{{hostname}}",
"range": true,
"refId": "A"
Expand Down Expand Up @@ -3262,7 +3262,7 @@
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "scm_block_deleting_service_num_block_deletion_transaction_sent",
"expr": "scm_block_deleting_service_num_block_deletion_transactions_on_datanodes",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "{{hostname}}",
Expand Down Expand Up @@ -3544,7 +3544,7 @@
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "scm_block_deleting_service_num_block_deletion_transaction_failure",
"expr": "scm_block_deleting_service_num_block_deletion_transaction_failure_on_datanodes",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
Expand Down
Loading
Loading