Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,11 @@ private void processCmd(DeleteCmdInfo cmd) {
DeletedContainerBlocksSummary summary =
DeletedContainerBlocksSummary.getFrom(containerBlocks);
LOG.info("Summary of deleting container blocks, numOfTransactions={}, "
+ "numOfContainers={}, numOfBlocks={}",
+ "numOfContainers={}, numOfBlocks={}, commandId={}.",
summary.getNumOfTxs(),
summary.getNumOfContainers(),
summary.getNumOfBlocks());
summary.getNumOfBlocks(),
cmd.getCmd().getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Start to delete container blocks, TXIDs={}",
summary.getTxIDSummary());
Expand All @@ -384,7 +385,8 @@ private void processCmd(DeleteCmdInfo cmd) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result : blockDeletionACK
.getResultsList()) {
LOG.debug("{} : {}", result.getTxID(), result.getSuccess());
LOG.debug("TxId = {} : ContainerId = {} : {}",
result.getTxID(), result.getContainerID(), result.getSuccess());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
blocksDeleted += tx.getLocalIDCount();
if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
SCMBlockDeletingService.LOG
.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
.debug("Transaction added: {} <- TX({}), DN {} <- blocksDeleted Add {}.",
dnID, tx.getTxID(), dnID, tx.getLocalIDCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand Down Expand Up @@ -200,20 +199,6 @@ private DeletedBlocksTransaction constructNewTransaction(
.build();
}

private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
result.getTxID(), result.getSuccess());
}
if (!result.getSuccess()) {
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ "TX in next interval", result.getTxID());
return true;
}
return false;
}

@Override
public int getNumOfValidTransactions() throws IOException {
lock.lock();
Expand Down Expand Up @@ -300,26 +285,46 @@ private void getTransaction(DeletedBlocksTransaction tx,
.setCount(transactionStatusManager.getOrDefaultRetryCount(
tx.getTxID(), 0))
.build();

for (ContainerReplica replica : replicas) {
DatanodeDetails details = replica.getDatanodeDetails();
if (!dnList.contains(details)) {
continue;
}
if (!transactionStatusManager.isDuplication(
details, updatedTxn.getTxID(), commandStatus)) {
transactions.addTransactionToDN(details.getUuid(), updatedTxn);
metrics.incrProcessedTransaction();
}
}
}

private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
DeletedBlocksTransaction txn) throws ContainerNotFoundException {
DeletedBlocksTransaction txn,
Set<DatanodeDetails> dnList) throws ContainerNotFoundException {
ContainerInfo containerInfo = containerManager
.getContainer(ContainerID.valueOf(txn.getContainerID()));
ReplicationManager replicationManager =
scmContext.getScm().getReplicationManager();
ContainerHealthResult result = replicationManager
.getContainerReplicationHealth(containerInfo, replicas);

// We have made an improvement here, and we expect that all replicas
// of the Container being sent will be included in the dnList.
// This change benefits ACK confirmation and improves deletion speed.
// The principle behind it is that
// DN can receive the command to delete a certain Container at the same time and provide
// feedback to SCM at roughly the same time.
// This avoids the issue of deletion blocking,
// where some replicas of a Container are deleted while others do not receive the delete command.
long containerId = txn.getContainerID();
for (ContainerReplica replica : replicas) {
DatanodeDetails datanodeDetails = replica.getDatanodeDetails();
if (!dnList.contains(datanodeDetails)) {
DatanodeDetails dnDetail = replica.getDatanodeDetails();
LOG.debug("Skip Container = {}, because DN = {} is not in dnList.",
containerId, dnDetail.getUuid());
return true;
}
}

return result.getHealthState() != ContainerHealthResult.HealthState.HEALTHY;
}

Expand All @@ -345,6 +350,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
.getCommandStatusByTxId(dnList.stream().
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
Expand All @@ -358,23 +364,25 @@ public DatanodeDeletedBlockTransactions getTransactions(
// HDDS-7126. When container is under replicated, it is possible
// that container is deleted, but transactions are not deleted.
if (containerManager.getContainer(id).isDeleted()) {
LOG.warn("Container: " + id + " was deleted for the " +
"transaction: " + txn);
LOG.warn("Container: {} was deleted for the " +
"transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
} else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
&& !containerManager.getContainer(id).isOpen()) {
Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(
ContainerID.valueOf(txn.getContainerID()));
if (checkInadequateReplica(replicas, txn)) {
if (checkInadequateReplica(replicas, txn, dnList)) {
metrics.incrSkippedTransaction();
continue;
}
getTransaction(
txn, transactions, dnList, replicas, commandStatus);
} else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) {
metrics.incrSkippedTransaction();
}
} catch (ContainerNotFoundException ex) {
LOG.warn("Container: " + id + " was not found for the transaction: "
+ txn);
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ public EmptyTaskResult call() throws Exception {
}
}
LOG.info("Totally added {} blocks to be deleted for"
+ " {} datanodes, task elapsed time: {}ms",
+ " {} datanodes / {} totalnodes, task elapsed time: {}ms",
transactions.getBlocksDeleted(),
transactions.getDatanodeTransactionMap().size(),
included.size(),
Time.monotonicNow() - startTime);
deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs));
} catch (NotLeaderException nle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* Metrics related to Block Deleting Service running in SCM.
Expand Down Expand Up @@ -76,6 +77,15 @@ public final class ScmBlockDeletingServiceMetrics {
@Metric(about = "The number of created txs which are added into DB.")
private MutableCounterLong numBlockDeletionTransactionCreated;

@Metric(about = "The number of skipped transactions")
private MutableCounterLong numSkippedTransactions;

@Metric(about = "The number of processed transactions")
private MutableCounterLong numProcessedTransactions;

@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;

private ScmBlockDeletingServiceMetrics() {
}

Expand Down Expand Up @@ -130,6 +140,18 @@ public void incrBlockDeletionTransactionCreated(long count) {
this.numBlockDeletionTransactionCreated.incr(count);
}

public void incrSkippedTransaction() {
this.numSkippedTransactions.incr();
}

public void incrProcessedTransaction() {
this.numProcessedTransactions.incr();
}

public void setNumBlockDeletionTransactionDataNodes(long dataNodes) {
this.numBlockDeletionTransactionDataNodes.set(dataNodes);
}

public long getNumBlockDeletionCommandSent() {
return numBlockDeletionCommandSent.value();
}
Expand Down Expand Up @@ -162,6 +184,18 @@ public long getNumBlockDeletionTransactionCreated() {
return numBlockDeletionTransactionCreated.value();
}

public long getNumSkippedTransactions() {
return numSkippedTransactions.value();
}

public long getNumProcessedTransactions() {
return numProcessedTransactions.value();
}

public long getNumBlockDeletionTransactionDataNodes() {
return numBlockDeletionTransactionDataNodes.value();
}

@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
Expand Down