diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java index f6ac0a4872cc..d99b59171597 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeature.java @@ -42,7 +42,8 @@ public enum HDDSLayoutFeature implements LayoutFeature { "to DatanodeDetails."), HBASE_SUPPORT(8, "Datanode RocksDB Schema Version 3 has an extra table " + "for the last chunk of blocks to support HBase.)"), - WITNESSED_CONTAINER_DB_PROTO_VALUE(9, "ContainerID table schema to use value type as proto"); + WITNESSED_CONTAINER_DB_PROTO_VALUE(9, "ContainerID table schema to use value type as proto"), + STORAGE_SPACE_DISTRIBUTION(10, "Enhanced block deletion function for storage space distribution feature."); ////////////////////////////// ////////////////////////////// diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 7677ed58707f..fbf1f46a9702 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; @@ -411,6 +412,12 @@ StartContainerBalancerResponseProto startContainerBalancer( */ void transferLeadership(String newLeaderId) throws IOException; + /** + * Get deleted block summary. + * @throws IOException + */ + DeletedBlocksTransactionSummary getDeletedBlockSummary() throws IOException; + /** * Get usage information of datanode by address or uuid. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 56411453fc8e..92ddfa7eb8dc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.protocol; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; @@ -361,6 +363,14 @@ List getFailedDeletedBlockTxn(int count, @Deprecated int resetDeletedBlockRetryCount(List txIDs) throws IOException; + + /** + * Get deleted block summary. + * @throws IOException + */ + @Nullable + DeletedBlocksTransactionSummary getDeletedBlockSummary() throws IOException; + /** * Check if SCM is in safe mode. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 2a85e6e40071..502d9a4fe98f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -42,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto; @@ -78,6 +80,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetDeletedBlocksTxnSummaryRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetDeletedBlocksTxnSummaryResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetExistContainerWithPipelinesInBatchRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetMetricsRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetMetricsResponseProto; @@ -801,6 +805,18 @@ public int resetDeletedBlockRetryCount(List txIDs) return 0; } + @Nullable + @Override + public DeletedBlocksTransactionSummary getDeletedBlockSummary() throws IOException { + GetDeletedBlocksTxnSummaryRequestProto request = + GetDeletedBlocksTxnSummaryRequestProto.newBuilder().build(); + ScmContainerLocationResponse scmContainerLocationResponse = submitRequest(Type.GetDeletedBlocksTransactionSummary, + builder -> builder.setGetDeletedBlocksTxnSummaryRequest(request)); + GetDeletedBlocksTxnSummaryResponseProto response = + scmContainerLocationResponse.getGetDeletedBlocksTxnSummaryResponse(); + return response.hasSummary() ? response.getSummary() : null; + } + /** * Check if SCM is in safe mode. * diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 3dfdea4c7324..f80a50a3be97 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -86,6 +86,7 @@ message ScmContainerLocationRequest { optional GetMetricsRequestProto getMetricsRequest = 47; optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48; optional ReconcileContainerRequestProto reconcileContainerRequest = 49; + optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50; } message ScmContainerLocationResponse { @@ -143,6 +144,7 @@ message ScmContainerLocationResponse { optional GetMetricsResponseProto getMetricsResponse = 47; optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48; optional ReconcileContainerResponseProto reconcileContainerResponse = 49; + optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50; enum Status { OK = 1; @@ -199,6 +201,7 @@ enum Type { GetMetrics = 43; GetContainerBalancerStatusInfo = 44; ReconcileContainer = 45; + GetDeletedBlocksTransactionSummary = 46; } /** @@ -545,6 +548,13 @@ message ResetDeletedBlockRetryCountResponseProto { required int32 resetCount = 1; } +message GetDeletedBlocksTxnSummaryRequestProto { +} + +message GetDeletedBlocksTxnSummaryResponseProto { + optional DeletedBlocksTransactionSummary summary = 1; +} + message FinalizeScmUpgradeRequestProto { required string upgradeClientId = 1; } diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 504f1a7ebdf1..31f8cc1fee02 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -514,6 +514,13 @@ message DeletedBlocksTransactionInfo { optional int32 count = 4; } +message DeletedBlocksTransactionSummary { + optional uint64 totalTransactionCount = 1; + optional uint64 totalBlockCount = 2; + optional uint64 totalBlockSize = 3; + optional uint64 totalBlockReplicatedSize = 4; +} + message CompactionFileInfoProto { optional string fileName = 1; optional string startKey = 2; diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index e48ed4d1c595..bb07a5146e6a 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -369,6 +369,8 @@ message DeletedBlocksTransaction { // the retry time of sending deleting command to datanode. // We don't have to store the retry count in DB. optional int32 count = 4 [deprecated=true]; + optional uint64 totalBlockSize = 5; + optional uint64 totalBlockReplicatedSize = 6; } // ACK message datanode sent to SCM, contains the result of diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 3bf60b06e7c5..6b0136abf664 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -93,13 +93,13 @@ public BlockManagerImpl(final ConfigurationSource conf, this.writableContainerFactory = scm.getWritableContainerFactory(); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); - metrics = ScmBlockDeletingServiceMetrics.create(); + metrics = ScmBlockDeletingServiceMetrics.create(this); // SCM block deleting transaction log and deleting service. deletedBlockLog = new DeletedBlockLogImpl(conf, scm, scm.getContainerManager(), - scm.getScmHAManager().getDBTransactionBuffer(), + scm.getScmHAManager().asSCMHADBTransactionBuffer(), metrics); @@ -220,9 +220,7 @@ public void deleteBlocks(List keyBlocksInfoList) throw new SCMException("SafeModePrecheck failed for deleteBlocks", SCMException.ResultCodes.SAFE_MODE_EXCEPTION); } - Map> containerBlocks = new HashMap<>(); - // TODO: track the block size info so that we can reclaim the container - // TODO: used space when the block is deleted. + Map> containerBlocks = new HashMap<>(); for (BlockGroup bg : keyBlocksInfoList) { if (LOG.isDebugEnabled()) { LOG.debug("Deleting blocks {}", @@ -232,10 +230,10 @@ public void deleteBlocks(List keyBlocksInfoList) BlockID block = deletedBlock.getBlockID(); long containerID = block.getContainerID(); if (containerBlocks.containsKey(containerID)) { - containerBlocks.get(containerID).add(block.getLocalID()); + containerBlocks.get(containerID).add(deletedBlock); } else { - List item = new ArrayList<>(); - item.add(block.getLocalID()); + List item = new ArrayList<>(); + item.add(deletedBlock); containerBlocks.put(containerID, item); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index b1283ef773c9..63ab44de346a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.scm.block; +import com.google.protobuf.ByteString; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -24,8 +26,10 @@ import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.common.DeletedBlock; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; /** @@ -100,7 +104,7 @@ void recordTransactionCreated( * @param containerBlocksMap a map of containerBlocks. * @throws IOException */ - void addTransactions(Map> containerBlocksMap) + void addTransactions(Map> containerBlocksMap) throws IOException; /** @@ -115,8 +119,13 @@ void addTransactions(Map> containerBlocksMap) /** * Reinitialize the delete log from the db. * @param deletedBlocksTXTable delete transaction table + * @param statefulConfigTable stateful service config table */ - void reinitialize(Table deletedBlocksTXTable); + void reinitialize(Table deletedBlocksTXTable, + Table statefulConfigTable) throws IOException; int getTransactionToDNsCommitMapSize(); + + @Nullable + DeletedBlocksTransactionSummary getTransactionSummary(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index e457727e1aed..81f6f241e78f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.DEL_TXN_ID; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -49,12 +51,15 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; -import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.common.DeletedBlock; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,8 +88,7 @@ public class DeletedBlockLogImpl private final SCMContext scmContext; private final SequenceIdGenerator sequenceIdGen; private final ScmBlockDeletingServiceMetrics metrics; - private final SCMDeletedBlockTransactionStatusManager - transactionStatusManager; + private SCMDeletedBlockTransactionStatusManager transactionStatusManager; private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis(); private long lastProcessedTransactionId = -1; @@ -94,24 +98,25 @@ public class DeletedBlockLogImpl public DeletedBlockLogImpl(ConfigurationSource conf, StorageContainerManager scm, ContainerManager containerManager, - DBTransactionBuffer dbTxBuffer, - ScmBlockDeletingServiceMetrics metrics) { + SCMHADBTransactionBuffer dbTxBuffer, + ScmBlockDeletingServiceMetrics metrics) throws IOException { this.containerManager = containerManager; this.lock = new ReentrantLock(); this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl .newBuilder() - .setConfiguration(conf) .setDeletedBlocksTable(scm.getScmMetadataStore().getDeletedBlocksTXTable()) .setContainerManager(containerManager) .setRatisServer(scm.getScmHAManager().getRatisServer()) .setSCMDBTransactionBuffer(dbTxBuffer) + .setStatefulConfigTable(scm.getScmMetadataStore().getStatefulServiceConfigTable()) .build(); this.scmContext = scm.getScmContext(); this.sequenceIdGen = scm.getSequenceIdGen(); this.metrics = metrics; this.transactionStatusManager = new SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager, + scm.getScmMetadataStore().getStatefulServiceConfigTable(), containerManager, metrics, scmCommandTimeoutMs); int limit = (int) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, @@ -125,7 +130,7 @@ public DeletedBlockLogImpl(ConfigurationSource conf, } @VisibleForTesting - void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) { + public void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) { this.deletedBlockLogStateManager = manager; } @@ -133,6 +138,10 @@ void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) { void setDeleteBlocksFactorPerDatanode(int deleteBlocksFactorPerDatanode) { this.deletionFactorPerDatanode = deleteBlocksFactorPerDatanode; } + + public DeletedBlockLogStateManager getDeletedBlockLogStateManager() { + return deletedBlockLogStateManager; + } /** * {@inheritDoc} @@ -147,13 +156,23 @@ public void incrementCount(List txIDs) } private DeletedBlocksTransaction constructNewTransaction( - long txID, long containerID, List blocks) { - return DeletedBlocksTransaction.newBuilder() + long txID, long containerID, List blocks) { + List localIdList = blocks.stream().map(b -> b.getBlockID().getLocalID()).collect(Collectors.toList()); + DeletedBlocksTransaction.Builder builder = DeletedBlocksTransaction.newBuilder() .setTxID(txID) .setContainerID(containerID) - .addAllLocalID(blocks) - .setCount(0) - .build(); + .addAllLocalID(localIdList) + .setCount(0); + + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION)) { + long replicatedSize = blocks.stream().mapToLong(DeletedBlock::getReplicatedSize).sum(); + // even when HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION is finalized, old OM can still call the old API + if (replicatedSize >= 0) { + builder.setTotalBlockReplicatedSize(replicatedSize); + builder.setTotalBlockSize(blocks.stream().mapToLong(DeletedBlock::getSize).sum()); + } + } + return builder.build(); } @Override @@ -176,11 +195,13 @@ public int getNumOfValidTransactions() throws IOException { @Override public void reinitialize( - Table deletedTable) { + Table deletedTable, Table statefulConfigTable) + throws IOException { // we don't need to handle SCMDeletedBlockTransactionStatusManager and // deletedBlockLogStateManager, since they will be cleared // when becoming leader. - deletedBlockLogStateManager.reinitialize(deletedTable); + deletedBlockLogStateManager.reinitialize(deletedTable, statefulConfigTable); + transactionStatusManager.reinitialize(statefulConfigTable); } /** @@ -206,13 +227,13 @@ public void onFlush() { * @throws IOException */ @Override - public void addTransactions(Map> containerBlocksMap) + public void addTransactions(Map> containerBlocksMap) throws IOException { lock.lock(); try { ArrayList txsToBeAdded = new ArrayList<>(); long currentBatchSizeBytes = 0; - for (Map.Entry< Long, List< Long > > entry : + for (Map.Entry> entry : containerBlocksMap.entrySet()) { long nextTXID = sequenceIdGen.getNextId(DEL_TXN_ID); DeletedBlocksTransaction tx = constructNewTransaction(nextTXID, @@ -222,14 +243,14 @@ public void addTransactions(Map> containerBlocksMap) currentBatchSizeBytes += txSize; if (currentBatchSizeBytes >= logAppenderQueueByteLimit) { - deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); + transactionStatusManager.addTransactions(txsToBeAdded); metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size()); txsToBeAdded.clear(); currentBatchSizeBytes = 0; } } if (!txsToBeAdded.isEmpty()) { - deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); + transactionStatusManager.addTransactions(txsToBeAdded); metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size()); } } finally { @@ -260,6 +281,7 @@ private void getTransaction(DeletedBlocksTransaction tx, if (!transactionStatusManager.isDuplication( datanodeID, tx.getTxID(), commandStatus)) { transactions.addTransactionToDN(datanodeID, tx); + addTxToTxSizeMap(tx); flag = true; } } @@ -300,6 +322,14 @@ private Boolean checkInadequateReplica(Set replicas, return result.getHealthState() != ContainerHealthResult.HealthState.HEALTHY; } + private void addTxToTxSizeMap(DeletedBlocksTransaction tx) { + if (tx.hasTotalBlockReplicatedSize()) { + transactionStatusManager.getTxSizeMap().put(tx.getTxID(), + new SCMDeletedBlockTransactionStatusManager.TxBlockInfo(tx.getLocalIDCount(), + tx.getTotalBlockSize(), tx.getTotalBlockReplicatedSize())); + } + } + @Override public DatanodeDeletedBlockTransactions getTransactions( int blockDeletionLimit, Set dnList) @@ -373,6 +403,7 @@ public DatanodeDeletedBlockTransactions getTransactions( if (container.isDeleted()) { LOG.warn("Container: {} was deleted for the transaction: {}.", id, txn); txIDs.add(txn.getTxID()); + addTxToTxSizeMap(txn); } else if (!container.isOpen()) { Set replicas = containerManager .getContainerReplicas( @@ -388,6 +419,7 @@ public DatanodeDeletedBlockTransactions getTransactions( } catch (ContainerNotFoundException ex) { LOG.warn("Container: {} was not found for the transaction: {}.", id, txn); txIDs.add(txn.getTxID()); + addTxToTxSizeMap(txn); } if (lastProcessedTransactionId == keyValue.getKey()) { @@ -411,6 +443,7 @@ public DatanodeDeletedBlockTransactions getTransactions( deletedBlockLogStateManager.removeTransactionsFromDB(txIDs); getSCMDeletedBlockTransactionStatusManager().removeTransactionFromDNsCommitMap(txIDs); getSCMDeletedBlockTransactionStatusManager().removeTransactionFromDNsRetryCountMap(txIDs); + transactionStatusManager.removeTransactions(txIDs); metrics.incrBlockDeletionTransactionCompleted(txIDs.size()); } } @@ -430,6 +463,11 @@ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) { return transactionStatusManager; } + @VisibleForTesting + public void setSCMDeletedBlockTransactionStatusManager(SCMDeletedBlockTransactionStatusManager manager) { + this.transactionStatusManager = manager; + } + @Override public void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set dnTxSet) { @@ -442,6 +480,11 @@ public int getTransactionToDNsCommitMapSize() { return getSCMDeletedBlockTransactionStatusManager().getTransactionToDNsCommitMapSize(); } + @Override + public DeletedBlocksTransactionSummary getTransactionSummary() { + return transactionStatusManager.getTransactionSummary(); + } + @Override public void onDatanodeDead(DatanodeID dnId) { getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java index 060b07bbdf93..f22718ce9ef2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManager.java @@ -17,8 +17,10 @@ package org.apache.hadoop.hdds.scm.block; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.metadata.Replicate; import org.apache.hadoop.hdds.utils.db.Table; @@ -29,7 +31,14 @@ */ public interface DeletedBlockLogStateManager { @Replicate - void addTransactionsToDB(ArrayList txs) + void addTransactionsToDB(ArrayList txs, + DeletedBlocksTransactionSummary summary) throws IOException; + + @Replicate + void addTransactionsToDB(ArrayList txs) throws IOException; + + @Replicate + void removeTransactionsFromDB(ArrayList txIDs, DeletedBlocksTransactionSummary summary) throws IOException; @Replicate @@ -49,7 +58,10 @@ int resetRetryCountOfTransactionInDB(ArrayList txIDs) Table.KeyValueIterator getReadOnlyIterator() throws IOException; + ArrayList getTransactionsFromDB(ArrayList txIDs) throws IOException; + void onFlush(); - void reinitialize(Table deletedBlocksTXTable); + void reinitialize(Table deletedBlocksTXTable, + Table statefulConfigTable); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java index b6976c3c3f38..7ef9708ac584 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java @@ -18,20 +18,22 @@ package org.apache.hadoop.hdds.scm.block; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; -import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; @@ -50,17 +52,20 @@ public class DeletedBlockLogStateManagerImpl LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class); private Table deletedTable; + private Table statefulConfigTable; private ContainerManager containerManager; - private final DBTransactionBuffer transactionBuffer; + private final SCMHADBTransactionBuffer transactionBuffer; private final Set deletingTxIDs; + public static final String SERVICE_NAME = DeletedBlockLogStateManager.class.getSimpleName(); - public DeletedBlockLogStateManagerImpl(ConfigurationSource conf, - Table deletedTable, - ContainerManager containerManager, DBTransactionBuffer txBuffer) { + public DeletedBlockLogStateManagerImpl(Table deletedTable, + Table statefulServiceConfigTable, + ContainerManager containerManager, SCMHADBTransactionBuffer txBuffer) { this.deletedTable = deletedTable; this.containerManager = containerManager; this.transactionBuffer = txBuffer; this.deletingTxIDs = ConcurrentHashMap.newKeySet(); + this.statefulConfigTable = statefulServiceConfigTable; } @Override @@ -139,8 +144,8 @@ public void removeFromDB() { } @Override - public void addTransactionsToDB(ArrayList txs) - throws IOException { + public void addTransactionsToDB(ArrayList txs, + DeletedBlocksTransactionSummary summary) throws IOException { Map containerIdToTxnIdMap = new HashMap<>(); for (DeletedBlocksTransaction tx : txs) { long tid = tx.getTxID(); @@ -148,11 +153,24 @@ public void addTransactionsToDB(ArrayList txs) (k, v) -> v != null && v > tid ? v : tid); transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx); } + transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME, summary.toByteString()); containerManager.updateDeleteTransactionId(containerIdToTxnIdMap); } @Override - public void removeTransactionsFromDB(ArrayList txIDs) + public void addTransactionsToDB(ArrayList txs) throws IOException { + Map containerIdToTxnIdMap = new HashMap<>(); + for (DeletedBlocksTransaction tx : txs) { + long tid = tx.getTxID(); + containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()), + (k, v) -> v != null && v > tid ? v : tid); + transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx); + } + containerManager.updateDeleteTransactionId(containerIdToTxnIdMap); + } + + @Override + public void removeTransactionsFromDB(ArrayList txIDs, DeletedBlocksTransactionSummary summary) throws IOException { if (deletingTxIDs != null) { deletingTxIDs.addAll(txIDs); @@ -160,6 +178,17 @@ public void removeTransactionsFromDB(ArrayList txIDs) for (Long txID : txIDs) { transactionBuffer.removeFromBuffer(deletedTable, txID); } + transactionBuffer.addToBuffer(statefulConfigTable, SERVICE_NAME, summary.toByteString()); + } + + @Override + public void removeTransactionsFromDB(ArrayList txIDs) throws IOException { + if (deletingTxIDs != null) { + deletingTxIDs.addAll(txIDs); + } + for (Long txID : txIDs) { + transactionBuffer.removeFromBuffer(deletedTable, txID); + } } @Deprecated @@ -181,6 +210,31 @@ public int resetRetryCountOfTransactionInDB(ArrayList txIDs) return 0; } + @Override + public ArrayList getTransactionsFromDB(ArrayList txIDs) throws IOException { + Objects.requireNonNull(txIDs, "txIds cannot be null."); + ArrayList transactions = new ArrayList<>(); + for (long txId: txIDs) { + try { + if (deletingTxIDs.contains(txId)) { + LOG.debug("txId {} is already in deletingTxIDs.", txId); + continue; + } + DeletedBlocksTransaction transaction = deletedTable.get(txId); + if (transaction == null) { + LOG.debug("txId {} is not found in deletedTable.", txId); + continue; + } + transactions.add(transaction); + } catch (IOException ex) { + LOG.error("Could not get deleted block transaction {}.", txId, ex); + throw ex; + } + } + LOG.debug("Get {} DeletedBlocksTransactions for {} input txIDs", transactions.size(), txIDs.size()); + return transactions; + } + @Override public void onFlush() { // onFlush() can be invoked only when ratis is enabled. @@ -190,7 +244,7 @@ public void onFlush() { @Override public void reinitialize( - Table deletedBlocksTXTable) { + Table deletedBlocksTXTable, Table configTable) { // Before Reinitialization, flush will be called from Ratis StateMachine. // Just the DeletedDb will be loaded here. @@ -199,6 +253,7 @@ public void reinitialize( // before reinitialization. Just update deletedTable here. Preconditions.checkArgument(deletingTxIDs.isEmpty()); this.deletedTable = deletedBlocksTXTable; + this.statefulConfigTable = configTable; } public static Builder newBuilder() { @@ -209,16 +264,11 @@ public static Builder newBuilder() { * Builder for ContainerStateManager. */ public static class Builder { - private ConfigurationSource conf; private SCMRatisServer scmRatisServer; - private Table table; - private DBTransactionBuffer transactionBuffer; + private Table deletedBlocksTransactionTable; + private SCMHADBTransactionBuffer transactionBuffer; private ContainerManager containerManager; - - public Builder setConfiguration(final ConfigurationSource config) { - conf = config; - return this; - } + private Table statefulServiceConfigTable; public Builder setRatisServer(final SCMRatisServer ratisServer) { scmRatisServer = ratisServer; @@ -227,11 +277,11 @@ public Builder setRatisServer(final SCMRatisServer ratisServer) { public Builder setDeletedBlocksTable( final Table deletedBlocksTable) { - table = deletedBlocksTable; + deletedBlocksTransactionTable = deletedBlocksTable; return this; } - public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) { + public Builder setSCMDBTransactionBuffer(SCMHADBTransactionBuffer buffer) { this.transactionBuffer = buffer; return this; } @@ -241,12 +291,16 @@ public Builder setContainerManager(ContainerManager contManager) { return this; } - public DeletedBlockLogStateManager build() { - Preconditions.checkNotNull(conf); - Preconditions.checkNotNull(table); + public Builder setStatefulConfigTable(final Table table) { + this.statefulServiceConfigTable = table; + return this; + } + + public DeletedBlockLogStateManager build() throws IOException { + Preconditions.checkNotNull(deletedBlocksTransactionTable); final DeletedBlockLogStateManager impl = new DeletedBlockLogStateManagerImpl( - conf, table, containerManager, transactionBuffer); + deletedBlocksTransactionTable, statefulServiceConfigTable, containerManager, transactionBuffer); return scmRatisServer.getProxyHandler(RequestType.BLOCK, DeletedBlockLogStateManager.class, impl); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java index f6136322f630..3be3417799e1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hdds.scm.block; import static java.lang.Math.min; +import static org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManagerImpl.SERVICE_NAME; import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus; import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT; import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import jakarta.annotation.Nullable; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -36,15 +39,22 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +71,9 @@ public class SCMDeletedBlockTransactionStatusManager { private final Map> transactionToDNsCommitMap; // Maps txId to its retry counts; private final Map transactionToRetryCountMap; + // an in memory map to cache the size of each transaction sending to DN. + private Map txSizeMap; + // The access to DeletedBlocksTXTable is protected by // DeletedBlockLogStateManager. private final DeletedBlockLogStateManager deletedBlockLogStateManager; @@ -68,6 +81,20 @@ public class SCMDeletedBlockTransactionStatusManager { private final ScmBlockDeletingServiceMetrics metrics; private final long scmCommandTimeoutMs; + private Table statefulConfigTable; + public static final HddsProtos.DeletedBlocksTransactionSummary EMPTY_SUMMARY = + HddsProtos.DeletedBlocksTransactionSummary.newBuilder() + .setTotalTransactionCount(0) + .setTotalBlockCount(0) + .setTotalBlockSize(0) + .setTotalBlockReplicatedSize(0) + .build(); + private final AtomicLong totalTxCount = new AtomicLong(0); + private final AtomicLong totalBlockCount = new AtomicLong(0); + private final AtomicLong totalBlocksSize = new AtomicLong(0); + private final AtomicLong totalReplicatedBlocksSize = new AtomicLong(0); + private static boolean disableDataDistributionForTest; + /** * Before the DeletedBlockTransaction is executed on DN and reported to * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}. @@ -80,17 +107,21 @@ public class SCMDeletedBlockTransactionStatusManager { public SCMDeletedBlockTransactionStatusManager( DeletedBlockLogStateManager deletedBlockLogStateManager, + Table statefulServiceConfigTable, ContainerManager containerManager, - ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) { + ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) throws IOException { // maps transaction to dns which have committed it. this.deletedBlockLogStateManager = deletedBlockLogStateManager; + this.statefulConfigTable = statefulServiceConfigTable; this.metrics = metrics; this.containerManager = containerManager; this.scmCommandTimeoutMs = scmCommandTimeoutMs; this.transactionToDNsCommitMap = new ConcurrentHashMap<>(); this.transactionToRetryCountMap = new ConcurrentHashMap<>(); + this.txSizeMap = new ConcurrentHashMap<>(); this.scmDeleteBlocksCommandStatusManager = new SCMDeleteBlocksCommandStatusManager(metrics); + this.initDataDistributionData(); } /** @@ -392,6 +423,7 @@ public void clear() { transactionToRetryCountMap.clear(); scmDeleteBlocksCommandStatusManager.clear(); transactionToDNsCommitMap.clear(); + txSizeMap.clear(); } public void cleanAllTimeoutSCMCommand(long timeoutMs) { @@ -415,6 +447,51 @@ private boolean alreadyExecuted(DatanodeID dnId, long txId) { .contains(dnId); } + @VisibleForTesting + public void addTransactions(ArrayList txList) throws IOException { + if (txList.isEmpty()) { + return; + } + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION) && + !disableDataDistributionForTest) { + for (DeletedBlocksTransaction tx: txList) { + if (tx.hasTotalBlockSize()) { + incrDeletedBlocksSummary(tx); + } + } + deletedBlockLogStateManager.addTransactionsToDB(txList, getSummary()); + return; + } + deletedBlockLogStateManager.addTransactionsToDB(txList); + } + + private void incrDeletedBlocksSummary(DeletedBlocksTransaction tx) { + totalTxCount.addAndGet(1); + totalBlockCount.addAndGet(tx.getLocalIDCount()); + totalBlocksSize.addAndGet(tx.getTotalBlockSize()); + totalReplicatedBlocksSize.addAndGet(tx.getTotalBlockReplicatedSize()); + } + + @VisibleForTesting + public void removeTransactions(ArrayList txIDs) throws IOException { + if (txIDs.isEmpty()) { + return; + } + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION) && + !disableDataDistributionForTest) { + for (Long txID: txIDs) { + TxBlockInfo txBlockInfo = txSizeMap.remove(txID); + if (txBlockInfo != null) { + descDeletedBlocksSummary(txBlockInfo); + } + } + deletedBlockLogStateManager.removeTransactionsFromDB(txIDs, getSummary()); + return; + } + + deletedBlockLogStateManager.removeTransactionsFromDB(txIDs); + } + /** * Commits a transaction means to delete all footprints of a transaction * from the log. This method doesn't guarantee all transactions can be @@ -483,7 +560,7 @@ public void commitTransactions(List transactionRes } } try { - deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted); + removeTransactions(txIDsToBeDeleted); metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size()); } catch (IOException e) { LOG.warn("Could not commit delete block transactions: " @@ -491,6 +568,22 @@ public void commitTransactions(List transactionRes } } + public DeletedBlocksTransactionSummary getSummary() { + return DeletedBlocksTransactionSummary.newBuilder() + .setTotalTransactionCount(totalTxCount.get()) + .setTotalBlockCount(totalBlockCount.get()) + .setTotalBlockSize(totalBlocksSize.get()) + .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get()) + .build(); + } + + private void descDeletedBlocksSummary(TxBlockInfo txBlockInfo) { + totalTxCount.addAndGet(-1); + totalBlockCount.addAndGet(-txBlockInfo.getTotalBlockCount()); + totalBlocksSize.addAndGet(-txBlockInfo.getTotalBlockSize()); + totalReplicatedBlocksSize.addAndGet(-txBlockInfo.getTotalReplicatedBlockSize()); + } + @VisibleForTesting void commitSCMCommandStatus(List deleteBlockStatus, DatanodeID dnId) { processSCMCommandStatus(deleteBlockStatus, dnId); @@ -545,4 +638,93 @@ public void removeTransactionFromDNsCommitMap(List txIds) { public void removeTransactionFromDNsRetryCountMap(List txIds) { txIds.forEach(transactionToRetryCountMap::remove); } + + public void reinitialize(Table configTable) throws IOException { + // DB onFlush() will be called before reinitialization. + this.statefulConfigTable = configTable; + this.initDataDistributionData(); + } + + @VisibleForTesting + public Map getTxSizeMap() { + return txSizeMap; + } + + @VisibleForTesting + public static void setDisableDataDistributionForTest(boolean disabled) { + disableDataDistributionForTest = disabled; + } + + @Nullable + public DeletedBlocksTransactionSummary getTransactionSummary() { + if (!VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION)) { + return null; + } + return DeletedBlocksTransactionSummary.newBuilder() + .setTotalTransactionCount(totalTxCount.get()) + .setTotalBlockCount(totalBlockCount.get()) + .setTotalBlockSize(totalBlocksSize.get()) + .setTotalBlockReplicatedSize(totalReplicatedBlocksSize.get()) + .build(); + } + + private void initDataDistributionData() throws IOException { + if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION)) { + DeletedBlocksTransactionSummary summary = loadDeletedBlocksSummary(); + if (summary != null) { + totalTxCount.set(summary.getTotalTransactionCount()); + totalBlockCount.set(summary.getTotalBlockCount()); + totalBlocksSize.set(summary.getTotalBlockSize()); + totalReplicatedBlocksSize.set(summary.getTotalBlockReplicatedSize()); + LOG.info("Data distribution is enabled with totalBlockCount {} totalBlocksSize {}", + totalBlockCount.get(), totalBlocksSize.get()); + } + } else { + LOG.info(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION + " is not finalized"); + } + } + + private DeletedBlocksTransactionSummary loadDeletedBlocksSummary() throws IOException { + String propertyName = DeletedBlocksTransactionSummary.class.getSimpleName(); + try { + ByteString byteString = statefulConfigTable.get(SERVICE_NAME); + if (byteString == null) { + // for a new Ozone cluster, property not found is an expected state. + LOG.info("Property {} for service {} not found. ", propertyName, SERVICE_NAME); + return null; + } + return DeletedBlocksTransactionSummary.parseFrom(byteString); + } catch (IOException e) { + LOG.error("Failed to get property {} for service {}. DataDistribution function will be disabled.", + propertyName, SERVICE_NAME, e); + throw new IOException("Failed to get property " + propertyName, e); + } + } + + /** + * Block size information of a transaction. + */ + public static class TxBlockInfo { + private long totalBlockCount; + private long totalBlockSize; + private long totalReplicatedBlockSize; + + public TxBlockInfo(long blockCount, long blockSize, long replicatedSize) { + this.totalBlockCount = blockCount; + this.totalBlockSize = blockSize; + this.totalReplicatedBlockSize = replicatedSize; + } + + public long getTotalBlockCount() { + return totalBlockCount; + } + + public long getTotalBlockSize() { + return totalBlockSize; + } + + public long getTotalReplicatedBlockSize() { + return totalReplicatedBlockSize; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java index cbfdddda7ca9..90be231d9ada 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/ScmBlockDeletingServiceMetrics.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -45,6 +46,7 @@ public final class ScmBlockDeletingServiceMetrics implements MetricsSource { public static final String SOURCE_NAME = SCMBlockDeletingService.class.getSimpleName(); private final MetricsRegistry registry; + private final BlockManager blockManager; /** * Given all commands are finished and no new coming deletes from OM. @@ -100,15 +102,32 @@ public final class ScmBlockDeletingServiceMetrics implements MetricsSource { private final Map numCommandsDatanode = new ConcurrentHashMap<>(); - private ScmBlockDeletingServiceMetrics() { + private static final MetricsInfo NUM_BLOCK_DELETION_TRANSACTIONS = Interns.info( + "numBlockDeletionTransactions", + "The number of transactions in DB."); + + private static final MetricsInfo NUM_BLOCK_OF_ALL_DELETION_TRANSACTIONS = Interns.info( + "numBlockOfAllDeletionTransactions", + "The number of blocks in all transactions in DB."); + + private static final MetricsInfo BLOCK_SIZE_OF_ALL_DELETION_TRANSACTIONS = Interns.info( + "blockSizeOfAllDeletionTransactions", + "The size of all blocks in all transactions in DB."); + + private static final MetricsInfo REPLICATED_BLOCK_SIZE_OF_ALL_DELETION_TRANSACTIONS = Interns.info( + "replicatedBlockSizeOfAllDeletionTransactions", + "The replicated size of all blocks in all transactions in DB."); + + private ScmBlockDeletingServiceMetrics(BlockManager blockManager) { this.registry = new MetricsRegistry(SOURCE_NAME); + this.blockManager = blockManager; } - public static synchronized ScmBlockDeletingServiceMetrics create() { + public static synchronized ScmBlockDeletingServiceMetrics create(BlockManager blockManager) { if (instance == null) { MetricsSystem ms = DefaultMetricsSystem.instance(); instance = ms.register(SOURCE_NAME, "SCMBlockDeletingService", - new ScmBlockDeletingServiceMetrics()); + new ScmBlockDeletingServiceMetrics(blockManager)); } return instance; @@ -256,6 +275,19 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { numBlockDeletionTransactionDataNodes.snapshot(builder, all); numBlockAddedForDeletionToDN.snapshot(builder, all); + // add metrics for deleted block transaction summary + HddsProtos.DeletedBlocksTransactionSummary summary = blockManager.getDeletedBlockLog().getTransactionSummary(); + if (summary != null) { + builder = builder.endRecord().addRecord(SOURCE_NAME) + .addGauge(NUM_BLOCK_DELETION_TRANSACTIONS, summary.getTotalTransactionCount()); + builder = builder.endRecord().addRecord(SOURCE_NAME) + .addGauge(NUM_BLOCK_OF_ALL_DELETION_TRANSACTIONS, summary.getTotalBlockCount()); + builder = builder.endRecord().addRecord(SOURCE_NAME) + .addGauge(BLOCK_SIZE_OF_ALL_DELETION_TRANSACTIONS, summary.getTotalBlockSize()); + builder = builder.endRecord().addRecord(SOURCE_NAME) + .addGauge(REPLICATED_BLOCK_SIZE_OF_ALL_DELETION_TRANSACTIONS, summary.getTotalBlockReplicatedSize()); + } + MetricsRecordBuilder recordBuilder = builder; for (Map.Entry e : numCommandsDatanode.entrySet()) { recordBuilder = recordBuilder.endRecord().addRecord(SOURCE_NAME) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java index a01effa3a20b..5bb6e01b28de 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMPerformanceMetrics.java @@ -117,5 +117,13 @@ public void updateDeleteKeySuccessBlocks(long keys) { public void updateDeleteKeyFailedBlocks(long keys) { deleteKeyBlocksFailure.incr(keys); } + + public long getDeleteKeySuccessBlocks() { + return deleteKeyBlocksSuccess.value(); + } + + public long getDeleteKeyFailedBlocks() { + return deleteKeyBlocksFailure.value(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java index 387b1001c2b1..2df3a68d0de8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java @@ -54,6 +54,7 @@ public class SCMHADBTransactionBufferImpl implements SCMHADBTransactionBuffer { private final AtomicLong txFlushPending = new AtomicLong(0); private long lastSnapshotTimeMs = 0; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private boolean autoFlushEnabled = true; public SCMHADBTransactionBufferImpl(StorageContainerManager scm) throws IOException { @@ -179,7 +180,7 @@ public boolean shouldFlush(long snapshotWaitTime) { rwLock.readLock().lock(); try { long timeDiff = scm.getSystemClock().millis() - lastSnapshotTimeMs; - return txFlushPending.get() > 0 && timeDiff > snapshotWaitTime; + return autoFlushEnabled && txFlushPending.get() > 0 && timeDiff > snapshotWaitTime; } finally { rwLock.readLock().unlock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index 00915406a4ce..a3f20476dc38 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -445,7 +445,7 @@ public void startServices() throws IOException { scm.getPipelineManager().reinitialize(metadataStore.getPipelineTable()); scm.getContainerManager().reinitialize(metadataStore.getContainerTable()); scm.getScmBlockManager().getDeletedBlockLog().reinitialize( - metadataStore.getDeletedBlocksTXTable()); + metadataStore.getDeletedBlocksTXTable(), metadataStore.getStatefulServiceConfigTable()); scm.getStatefulServiceStateManager().reinitialize( metadataStore.getStatefulServiceConfigTable()); if (OzoneSecurityUtil.isSecurityEnabled(conf)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index b9d4b9d6aef5..3b061aa10c01 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainersOnDecomNodeResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetDeletedBlocksTxnSummaryRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetDeletedBlocksTxnSummaryResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetExistContainerWithPipelinesInBatchRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetExistContainerWithPipelinesInBatchResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetFailedDeletedBlocksTxnRequestProto; @@ -712,6 +714,14 @@ public ScmContainerLocationResponse processRequest( getResetDeletedBlockRetryCount( request.getResetDeletedBlockRetryCountRequest())) .build(); + case GetDeletedBlocksTransactionSummary: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setGetDeletedBlocksTxnSummaryResponse( + getDeletedBlocksTxnSummary( + request.getGetDeletedBlocksTxnSummaryRequest())) + .build(); case TransferLeadership: return ScmContainerLocationResponse.newBuilder() .setCmdType(request.getCmdType()) @@ -1344,6 +1354,18 @@ public GetFailedDeletedBlocksTxnResponseProto getFailedDeletedBlocksTxn( .build(); } + public GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummary( + GetDeletedBlocksTxnSummaryRequestProto request) throws IOException { + HddsProtos.DeletedBlocksTransactionSummary summary = impl.getDeletedBlockSummary(); + if (summary == null) { + return GetDeletedBlocksTxnSummaryResponseProto.newBuilder().build(); + } else { + return GetDeletedBlocksTxnSummaryResponseProto.newBuilder() + .setSummary(summary) + .build(); + } + } + public TransferLeadershipResponseProto transferScmLeadership( TransferLeadershipRequestProto request) throws IOException { String newLeaderId = request.getNewLeaderId(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java index c4089bbca5c1..31230f071d59 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java @@ -24,7 +24,10 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; +import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -59,4 +62,10 @@ public interface OzoneStorageContainerManager { SCMNodeDetails getScmNodeDetails(); ReconfigurationHandler getReconfigurationHandler(); + + SCMMetadataStore getScmMetadataStore(); + + SCMHAManager getScmHAManager(); + + SequenceIdGenerator getSequenceIdGen(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 22674042d20a..d2b2b6cbe43c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -478,4 +478,8 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map txIDs) throws IOException { return 0; } + @Nullable + @Override + public DeletedBlocksTransactionSummary getDeletedBlockSummary() { + final Map auditMap = Maps.newHashMap(); + try { + DeletedBlocksTransactionSummary summary = + scm.getScmBlockManager().getDeletedBlockLog().getTransactionSummary(); + AUDIT.logReadSuccess(buildAuditMessageForSuccess( + SCMAction.GET_DELETED_BLOCK_SUMMARY, auditMap)); + return summary; + } catch (Exception ex) { + AUDIT.logReadFailure(buildAuditMessageForFailure( + SCMAction.GET_DELETED_BLOCK_SUMMARY, auditMap, ex)); + throw ex; + } + } + /** * Check if SCM is in safe mode. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 4253e7b1c114..6b0d6bb97e60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -184,6 +184,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.common.Storage.StorageState; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.lease.LeaseManagerNotRunningException; import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor; @@ -679,6 +680,7 @@ private void initializeSystemManagers(OzoneConfiguration conf, scmLayoutVersionManager = new HDDSLayoutVersionManager( scmStorageConfig.getLayoutVersion()); + VersionedDatanodeFeatures.initialize(scmLayoutVersionManager); UpgradeFinalizationExecutor finalizationExecutor; @@ -1805,6 +1807,7 @@ public NodeDecommissionManager getScmDecommissionManager() { /** * Returns SCMHAManager. */ + @Override public SCMHAManager getScmHAManager() { return scmHAManager; } @@ -1957,6 +1960,7 @@ public SCMContext getScmContext() { /** * Returns SequenceIdGen. */ + @Override public SequenceIdGenerator getSequenceIdGen() { return sequenceIdGen; } @@ -1995,6 +1999,7 @@ public Map getContainerStateCount() { * Returns the SCM metadata Store. * @return SCMMetadataStore */ + @Override public SCMMetadataStore getScmMetadataStore() { return scmMetadataStore; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index 95e13146deed..52cd943c4dbb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -67,7 +67,8 @@ public enum SCMAction implements AuditAction { GET_METRICS, QUERY_NODE, GET_PIPELINE, - RECONCILE_CONTAINER; + RECONCILE_CONTAINER, + GET_DELETED_BLOCK_SUMMARY; @Override public String getAction() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index bc0c5cba4d1a..e3bea80d3e97 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -17,9 +17,12 @@ package org.apache.hadoop.hdds.scm.block; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.EMPTY_SUMMARY; +import static org.apache.hadoop.ozone.common.BlockGroup.SIZE_NOT_AVAILABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -44,8 +47,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -61,6 +66,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.TxBlockInfo; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -75,6 +81,7 @@ import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.common.DeletedBlock; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -83,6 +90,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; /** @@ -121,7 +130,9 @@ public void setup() throws Exception { containerTable = scm.getScmMetadataStore().getContainerTable(); scmHADBTransactionBuffer = new SCMHADBTransactionBufferStub(scm.getScmMetadataStore().getStore()); - metrics = mock(ScmBlockDeletingServiceMetrics.class); + BlockManager blockManager = mock(BlockManager.class); + when(blockManager.getDeletedBlockLog()).thenReturn(deletedBlockLog); + metrics = ScmBlockDeletingServiceMetrics.create(blockManager); deletedBlockLog = new DeletedBlockLogImpl(conf, scm, containerManager, @@ -198,34 +209,36 @@ private void updateContainerMetadata(long cid, @AfterEach public void tearDown() throws Exception { + ScmBlockDeletingServiceMetrics.unRegister(); deletedBlockLog.close(); scm.stop(); scm.join(); } - private Map> generateData(int dataSize) throws IOException { + private Map> generateData(int dataSize) throws IOException { return generateData(dataSize, HddsProtos.LifeCycleState.CLOSED); } - private Map> generateData(int dataSize, + private Map> generateData(int txCount, HddsProtos.LifeCycleState state) throws IOException { - Map> blockMap = new HashMap<>(); - int continerIDBase = RandomUtils.secure().randomInt(0, 100); + Map> blockMap = new HashMap<>(); + long continerIDBase = RandomUtils.secure().randomLong(0, 100); int localIDBase = RandomUtils.secure().randomInt(0, 1000); - for (int i = 0; i < dataSize; i++) { + long blockSize = 1024 * 1024 * 64; + for (int i = 0; i < txCount; i++) { + List blocks = new ArrayList<>(); long containerID = continerIDBase + i; updateContainerMetadata(containerID, state); - List blocks = new ArrayList<>(); for (int j = 0; j < BLOCKS_PER_TXN; j++) { long localID = localIDBase + j; - blocks.add(localID); + blocks.add(new DeletedBlock(new BlockID(containerID, localID), blockSize + j, blockSize + j)); } blockMap.put(containerID, blocks); } return blockMap; } - private void addTransactions(Map> containerBlocksMap, + private void addTransactions(Map> containerBlocksMap, boolean shouldFlush) throws IOException { deletedBlockLog.addTransactions(containerBlocksMap); if (shouldFlush) { @@ -338,15 +351,15 @@ private void mockContainerHealthResult(Boolean healthy) { public void testAddTransactionsIsBatched() throws Exception { conf.setStorageSize(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, 1, StorageUnit.KB); - DeletedBlockLogStateManager mockStateManager = mock(DeletedBlockLogStateManager.class); + SCMDeletedBlockTransactionStatusManager mockStatusManager = mock(SCMDeletedBlockTransactionStatusManager.class); DeletedBlockLogImpl log = new DeletedBlockLogImpl(conf, scm, containerManager, scmHADBTransactionBuffer, metrics); - log.setDeletedBlockLogStateManager(mockStateManager); + log.setSCMDeletedBlockTransactionStatusManager(mockStatusManager); - Map> containerBlocksMap = generateData(100); + Map> containerBlocksMap = generateData(100); log.addTransactions(containerBlocksMap); - verify(mockStateManager, atLeast(2)).addTransactionsToDB(any()); + verify(mockStatusManager, atLeast(2)).addTransactions(any()); } @Test @@ -576,7 +589,7 @@ public void testFailedAndTimeoutSCMCommandCanBeResend() throws Exception { @Test public void testDNOnlyOneNodeHealthy() throws Exception { - Map> deletedBlocks = generateData(50); + Map> deletedBlocks = generateData(50); addTransactions(deletedBlocks, true); mockContainerHealthResult(false); DatanodeDeletedBlockTransactions transactions @@ -588,12 +601,12 @@ public void testDNOnlyOneNodeHealthy() throws Exception { @Test public void testInadequateReplicaCommit() throws Exception { - Map> deletedBlocks = generateData(50); + Map> deletedBlocks = generateData(50); addTransactions(deletedBlocks, true); long containerID; // let the first 30 container only consisting of only two unhealthy replicas int count = 0; - for (Map.Entry> entry : deletedBlocks.entrySet()) { + for (Map.Entry> entry : deletedBlocks.entrySet()) { containerID = entry.getKey(); mockInadequateReplicaUnhealthyContainerInfo(containerID, count); count += 1; @@ -695,9 +708,9 @@ public void testDeletedBlockTransactions() throws IOException { long containerID; // Creates {TXNum} TX in the log. - Map> deletedBlocks = generateData(txNum); + Map> deletedBlocks = generateData(txNum); addTransactions(deletedBlocks, true); - for (Map.Entry> entry :deletedBlocks.entrySet()) { + for (Map.Entry> entry :deletedBlocks.entrySet()) { count++; containerID = entry.getKey(); // let the container replication factor to be ONE @@ -717,10 +730,11 @@ public void testDeletedBlockTransactions() throws IOException { // add two transactions for same container containerID = blocks.get(0).getContainerID(); - Map> deletedBlocksMap = new HashMap<>(); + Map> deletedBlocksMap = new HashMap<>(); long localId = RandomUtils.secure().randomLong(); - deletedBlocksMap.put(containerID, new LinkedList<>( - Collections.singletonList(localId))); + List blockIDList = new ArrayList<>(); + blockIDList.add(new DeletedBlock(new BlockID(containerID, localId), SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); + deletedBlocksMap.put(containerID, blockIDList); addTransactions(deletedBlocksMap, true); blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE); // Only newly added Blocks will be sent, as previously sent transactions @@ -747,7 +761,7 @@ public void testGetTransactionsWithMaxBlocksPerDatanode(int maxAllowedBlockNum) DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1); // Creates {TXNum} TX in the log. - Map> deletedBlocks = generateData(txNum); + Map> deletedBlocks = generateData(txNum); addTransactions(deletedBlocks, true); List containerIds = new ArrayList<>(deletedBlocks.keySet()); for (int i = 0; i < containerIds.size(); i++) { @@ -778,7 +792,7 @@ public void testDeletedBlockTransactionsOfDeletedContainer() throws IOException List blocks; // Creates {TXNum} TX in the log. - Map> deletedBlocks = generateData(txNum, + Map> deletedBlocks = generateData(txNum, HddsProtos.LifeCycleState.DELETED); addTransactions(deletedBlocks, true); @@ -787,6 +801,130 @@ public void testDeletedBlockTransactionsOfDeletedContainer() throws IOException assertEquals(0, blocks.size()); } + @ParameterizedTest + @ValueSource(ints = {1, 10, 25, 50, 100}) + public void testTransactionSerializedSize(int blockCount) { + long txID = 10000000; + long containerID = 1000000; + List blocks = new ArrayList<>(); + for (int i = 0; i < blockCount; i++) { + blocks.add(new DeletedBlock(new BlockID(containerID, 100000000 + i), 128 * 1024 * 1024, 128 * 1024 * 1024)); + } + List localIdList = blocks.stream().map(b -> b.getBlockID().getLocalID()).collect(Collectors.toList()); + DeletedBlocksTransaction tx1 = DeletedBlocksTransaction.newBuilder() + .setTxID(txID) + .setContainerID(containerID) + .addAllLocalID(localIdList) + .setCount(0) + .setTotalBlockSize(blocks.stream().mapToLong(DeletedBlock::getSize).sum()) + .setTotalBlockReplicatedSize(blocks.stream().mapToLong(DeletedBlock::getReplicatedSize).sum()) + .build(); + DeletedBlocksTransaction tx2 = DeletedBlocksTransaction.newBuilder() + .setTxID(txID) + .setContainerID(containerID) + .addAllLocalID(localIdList) + .setCount(0) + .build(); + /* + * 1 blocks tx with totalBlockSize size is 26 + * 1 blocks tx without totalBlockSize size is 16 + * 10 blocks tx with totalBlockSize size is 73 + * 10 blocks tx without totalBlockSize size is 61 + * 25 blocks tx with totalBlockSize size is 148 + * 25 blocks tx without totalBlockSize size is 136 + * 50 blocks tx with totalBlockSize size is 273 + * 50 blocks tx without totalBlockSize size is 261 + * 100 blocks tx with totalBlockSize size is 523 + * 100 blocks tx without totalBlockSize size is 511 + */ + System.out.println(blockCount + " blocks tx with totalBlockSize size is " + tx1.getSerializedSize()); + System.out.println(blockCount + " blocks tx without totalBlockSize size is " + tx2.getSerializedSize()); + } + + public static Stream values() { + return Stream.of( + arguments(100, false), + arguments(100, true), + arguments(1000, false), + arguments(1000, true), + arguments(10000, false), + arguments(10000, true), + arguments(100000, false), + arguments(100000, true) + ); + } + + @ParameterizedTest + @MethodSource("values") + public void testAddRemoveTransactionPerformance(int txCount, boolean dataDistributionFinalized) + throws Exception { + Map> data = generateData(txCount); + SCMDeletedBlockTransactionStatusManager statusManager = + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager(); + HddsProtos.DeletedBlocksTransactionSummary summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + + SCMDeletedBlockTransactionStatusManager.setDisableDataDistributionForTest(!dataDistributionFinalized); + long startTime = System.nanoTime(); + deletedBlockLog.addTransactions(data); + scmHADBTransactionBuffer.flush(); + /** + * Before DataDistribution is enabled + * - 979 ms to add 100 txs to DB + * - 275 ms to add 1000 txs to DB + * - 1106 ms to add 10000 txs to DB + * - 11103 ms to add 100000 txs to DB + * After DataDistribution is enabled + * - 908 ms to add 100 txs to DB + * - 351 ms to add 1000 txs to DB + * - 2875 ms to add 10000 txs to DB + * - 12446 ms to add 100000 txs to DB + */ + System.out.println((System.nanoTime() - startTime) / 100000 + " ms to add " + txCount + " txs to DB, " + + "dataDistributionFinalized " + dataDistributionFinalized); + summary = statusManager.getTransactionSummary(); + if (dataDistributionFinalized) { + assertEquals(txCount, summary.getTotalTransactionCount()); + } else { + assertEquals(0, summary.getTotalTransactionCount()); + } + + ArrayList txIdList = data.keySet().stream().collect(Collectors.toCollection(ArrayList::new)); + + if (dataDistributionFinalized) { + Map txSizeMap = statusManager.getTxSizeMap(); + for (Map.Entry> entry : data.entrySet()) { + List deletedBlockList = entry.getValue(); + TxBlockInfo txBlockInfo = new TxBlockInfo(deletedBlockList.size(), + deletedBlockList.stream().map(DeletedBlock::getSize).reduce(0L, Long::sum), + deletedBlockList.stream().map(DeletedBlock::getReplicatedSize).reduce(0L, Long::sum)); + txSizeMap.put(entry.getKey(), txBlockInfo); + } + } + startTime = System.nanoTime(); + statusManager.removeTransactions(txIdList); + scmHADBTransactionBuffer.flush(); + /** + * Before DataDistribution is enabled + * - 19 ms to remove 100 txs from DB + * - 26 ms to remove 1000 txs from DB + * - 142 ms to remove 10000 txs from DB + * - 2571 ms to remove 100000 txs from DB + * After DataDistribution is enabled (all cache miss) + * - 62 ms to remove 100 txs from DB + * - 186 ms to remove 1000 txs from DB + * - 968 ms to remove 10000 txs from DB + * - 8635 ms to remove 100000 txs from DB + * After DataDistribution is enabled (all cache hit) + * - 40 ms to remove 100 txs from DB + * - 112 ms to remove 1000 txs from DB + * - 412 ms to remove 10000 txs from DB + * - 3499 ms to remove 100000 txs from DB + */ + System.out.println((System.nanoTime() - startTime) / 100000 + " ms to remove " + txCount + " txs from DB, " + + "dataDistributionFinalized " + dataDistributionFinalized); + } + private void mockStandAloneContainerInfo(long containerID, DatanodeDetails dd) throws IOException { List dns = Collections.singletonList(dd); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java index bc60c8c4ff28..de4b13e5b7d0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -72,7 +72,7 @@ public void setup() throws Exception { nodeManager = mock(NodeManager.class); eventPublisher = mock(EventPublisher.class); conf = new OzoneConfiguration(); - metrics = ScmBlockDeletingServiceMetrics.create(); + metrics = ScmBlockDeletingServiceMetrics.create(mock(BlockManager.class)); when(nodeManager.getTotalDatanodeCommandCount(any(), any())).thenReturn(0); SCMServiceManager scmServiceManager = mock(SCMServiceManager.class); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 133166dec487..61c0f4150c34 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto; @@ -184,7 +185,7 @@ public void createContainer(XceiverClientSpi client, // creation state. if (LOG.isDebugEnabled()) { LOG.debug("Created container {} machines {}", containerId, - client.getPipeline().getNodes()); + client.getPipeline().getNodes()); } } @@ -210,7 +211,7 @@ public ContainerWithPipeline createContainer(ReplicationConfig replicationConfig XceiverClientSpi client = null; XceiverClientManager clientManager = getXceiverClientManager(); try { - ContainerWithPipeline containerWithPipeline = + ContainerWithPipeline containerWithPipeline = storageContainerLocationClient.allocateContainer(replicationConfig, owner); Pipeline pipeline = containerWithPipeline.getPipeline(); // connect to pipeline leader and allocate container on leader datanode. @@ -396,8 +397,7 @@ public ContainerDataProto readContainer(long containerID, } } - public Map - readContainerFromAllNodes(long containerID, Pipeline pipeline) + public Map readContainerFromAllNodes(long containerID, Pipeline pipeline) throws IOException, InterruptedException { XceiverClientManager clientManager = getXceiverClientManager(); String encodedToken = getEncodedContainerToken(containerID); @@ -434,8 +434,7 @@ public ContainerWithPipeline getContainerWithPipeline(long containerId) } @Override - public List - getContainerReplicas(long containerId) throws IOException { + public List getContainerReplicas(long containerId) throws IOException { List protos = storageContainerLocationClient.getContainerReplicas(containerId, ClientVersion.CURRENT_VERSION); @@ -550,6 +549,11 @@ public void transferLeadership(String newLeaderId) throws IOException { storageContainerLocationClient.transferLeadership(newLeaderId); } + @Override + public DeletedBlocksTransactionSummary getDeletedBlockSummary() throws IOException { + return storageContainerLocationClient.getDeletedBlockSummary(); + } + @Override public List getDatanodeUsageInfo( String address, String uuid) throws IOException { diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/DeletedBlocksTxnCommands.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/DeletedBlocksTxnCommands.java new file mode 100644 index 000000000000..b816cee2d7b6 --- /dev/null +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/DeletedBlocksTxnCommands.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.admin.scm; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import picocli.CommandLine; + +/** + * Subcommand to group container related operations. + */ +@CommandLine.Command( + name = "deletedBlocksTxn", + description = "SCM deleted blocks transaction specific operations", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class, + subcommands = { + GetDeletedBlockSummarySubcommand.class, + }) +public class DeletedBlocksTxnCommands { + +} diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/GetDeletedBlockSummarySubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/GetDeletedBlockSummarySubcommand.java new file mode 100644 index 000000000000..088daea4e37b --- /dev/null +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/GetDeletedBlockSummarySubcommand.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.admin.scm; + +import java.io.IOException; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +/** + * Handler of getting deleted blocks summary from SCM side. + */ +@CommandLine.Command( + name = "summary", + description = "get DeletedBlocksTransaction summary", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class GetDeletedBlockSummarySubcommand extends ScmSubcommand { + + @Override + public void execute(ScmClient client) throws IOException { + HddsProtos.DeletedBlocksTransactionSummary summary = client.getDeletedBlockSummary(); + if (summary == null) { + System.out.println("DeletedBlocksTransaction summary is not available"); + } else { + System.out.println("DeletedBlocksTransaction summary:"); + System.out.println(" Total number of transactions: " + + summary.getTotalTransactionCount()); + System.out.println(" Total number of blocks: " + + summary.getTotalBlockCount()); + System.out.println(" Total size of blocks: " + + summary.getTotalBlockSize()); + System.out.println(" Total replicated size of blocks: " + + summary.getTotalBlockReplicatedSize()); + } + } +} diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/ScmAdmin.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/ScmAdmin.java index 0228eb221be8..a4782cd3a743 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/ScmAdmin.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/ozone/admin/scm/ScmAdmin.java @@ -37,7 +37,8 @@ FinalizationScmStatusSubcommand.class, TransferScmLeaderSubCommand.class, DecommissionScmSubcommand.class, - RotateKeySubCommand.class + RotateKeySubCommand.class, + DeletedBlocksTxnCommands.class }) @MetaInfServices(AdminSubcommand.class) public class ScmAdmin implements AdminSubcommand { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java index e407e30bffc6..7276dc871eac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hdds.scm.HddsTestUtils.mockRemoteUser; import static org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils.setInternalState; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.common.BlockGroup.SIZE_NOT_AVAILABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -64,11 +65,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; @@ -118,6 +121,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.common.DeletedBlock; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; @@ -267,7 +271,7 @@ private void testBlockDeletionTransactions(MiniOzoneCluster cluster) throws Exce OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(), cluster.getStorageContainerManager()); } - Map> containerBlocks = createDeleteTXLog( + Map> containerBlocks = createDeleteTXLog( cluster.getStorageContainerManager(), delLog, keyLocations, cluster); @@ -285,10 +289,12 @@ private void testBlockDeletionTransactions(MiniOzoneCluster cluster) throws Exce // but unknown block IDs. for (Long containerID : containerBlocks.keySet()) { // Add 2 TXs per container. - Map> deletedBlocks = new HashMap<>(); - List blocks = new ArrayList<>(); - blocks.add(RandomUtils.secure().randomLong()); - blocks.add(RandomUtils.secure().randomLong()); + Map> deletedBlocks = new HashMap<>(); + List blocks = new ArrayList<>(); + blocks.add(new DeletedBlock(new BlockID(containerID, RandomUtils.secure().randomLong()), + SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); + blocks.add(new DeletedBlock(new BlockID(containerID, RandomUtils.secure().randomLong()), + SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); deletedBlocks.put(containerID, blocks); addTransactions(cluster.getStorageContainerManager(), delLog, deletedBlocks); @@ -464,7 +470,7 @@ public void testBlockDeletingThrottling() throws Exception { } } - private Map> createDeleteTXLog( + private Map> createDeleteTXLog( StorageContainerManager scm, DeletedBlockLog delLog, Map keyLocations, MiniOzoneCluster cluster) @@ -489,17 +495,17 @@ private Map> createDeleteTXLog( getAllBlocks(cluster, containerNames).size()); // Create a deletion TX for each key. - Map> containerBlocks = Maps.newHashMap(); + Map> containerBlocks = Maps.newHashMap(); for (OmKeyInfo info : keyLocations.values()) { List list = info.getLatestVersionLocations().getLocationList(); list.forEach(location -> { if (containerBlocks.containsKey(location.getContainerID())) { containerBlocks.get(location.getContainerID()) - .add(location.getBlockID().getLocalID()); + .add(new DeletedBlock(location.getBlockID(), SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); } else { - List blks = Lists.newArrayList(); - blks.add(location.getBlockID().getLocalID()); + List blks = Lists.newArrayList(); + blks.add(new DeletedBlock(location.getBlockID(), SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); containerBlocks.put(location.getContainerID(), blks); } }); @@ -875,7 +881,7 @@ public void testIncrementalContainerReportQueue() throws Exception { private void addTransactions(StorageContainerManager scm, DeletedBlockLog delLog, - Map> containerBlocksMap) + Map> containerBlocksMap) throws IOException, TimeoutException { delLog.addTransactions(containerBlocksMap); scm.getScmHAManager().asSCMHADBTransactionBuffer().flush(); @@ -908,9 +914,9 @@ public List getAllBlocks(MiniOzoneCluster cluster, Long containerID) throw } public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster, - Map> containerBlocks) + Map> containerBlocks) throws IOException { - for (Map.Entry> entry : containerBlocks.entrySet()) { + for (Map.Entry> entry : containerBlocks.entrySet()) { KeyValueContainerData cData = getContainerMetadata(cluster, entry.getKey()); try (DBHandle db = BlockUtils.getDB(cData, cluster.getConf())) { DatanodeStore ds = db.getStore(); @@ -925,7 +931,9 @@ public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster, txnsInTxnTable) { conID.addAll(txn.getValue().getLocalIDList()); } - if (!conID.equals(containerBlocks.get(entry.getKey()))) { + List localIDList = containerBlocks.get(entry.getKey()).stream() + .map(b -> b.getBlockID().getLocalID()).collect(Collectors.toList()); + if (!conID.equals(localIDList)) { return false; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java new file mode 100644 index 000000000000..d443fc7f37e1 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmDataDistributionFinalization.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.upgrade; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT; +import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.EMPTY_SUMMARY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.common.BlockGroup.SIZE_NOT_AVAILABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; +import org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager; +import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer; +import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.server.SCMConfigurator; +import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint; +import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext; +import org.apache.hadoop.hdds.utils.db.CodecException; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.UniformDatanodesFactory; +import org.apache.hadoop.ozone.client.BucketArgs; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.common.DeletedBlock; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests upgrade finalization failure scenarios and corner cases specific to SCM data distribution feature. + */ +public class TestScmDataDistributionFinalization { + private static final String CLIENT_ID = UUID.randomUUID().toString(); + private static final Logger LOG = + LoggerFactory.getLogger(TestScmDataDistributionFinalization.class); + + private StorageContainerLocationProtocol scmClient; + private MiniOzoneHAClusterImpl cluster; + private static final int NUM_DATANODES = 3; + private static final int NUM_SCMS = 3; + private Future finalizationFuture; + private final String volumeName = UUID.randomUUID().toString(); + private final String bucketName = UUID.randomUUID().toString(); + private OzoneBucket bucket; + private static final long BLOCK_SIZE = 1024 * 1024; // 1 MB + private static final long BLOCKS_PER_TX = 5; // 1 MB + + public void init(OzoneConfiguration conf, + UpgradeFinalizationExecutor executor, boolean doFinalize) throws Exception { + + SCMConfigurator configurator = new SCMConfigurator(); + configurator.setUpgradeFinalizationExecutor(executor); + + conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, HDDSLayoutFeature.HBASE_SUPPORT.layoutVersion()); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + scmConfig.setBlockDeletionInterval(Duration.ofMillis(100)); + conf.setFromObject(scmConfig); + conf.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s"); + + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + dnConf.setBlockDeletionInterval(Duration.ofMillis(100)); + conf.setFromObject(dnConf); + + MiniOzoneHAClusterImpl.Builder clusterBuilder = MiniOzoneCluster.newHABuilder(conf); + clusterBuilder.setNumOfStorageContainerManagers(NUM_SCMS) + .setNumOfActiveSCMs(NUM_SCMS) + .setSCMServiceId("scmservice") + .setOMServiceId("omServiceId") + .setNumOfOzoneManagers(1) + .setSCMConfigurator(configurator) + .setNumDatanodes(NUM_DATANODES) + .setDatanodeFactory(UniformDatanodesFactory.newBuilder() + .setLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion()) + .build()); + this.cluster = clusterBuilder.build(); + + scmClient = cluster.getStorageContainerLocationClient(); + cluster.waitForClusterToBeReady(); + assertEquals(HDDSLayoutFeature.HBASE_SUPPORT.layoutVersion(), + cluster.getStorageContainerManager().getLayoutVersionManager().getMetadataLayoutVersion()); + + // Create Volume and Bucket + try (OzoneClient ozoneClient = OzoneClientFactory.getRpcClient(conf)) { + ObjectStore store = ozoneClient.getObjectStore(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + BucketArgs.Builder builder = BucketArgs.newBuilder(); + volume.createBucket(bucketName, builder.build()); + bucket = volume.getBucket(bucketName); + } + + // Launch finalization from the client. In the current implementation, + // this call will block until finalization completes. If the test + // involves restarts or leader changes the client may be disconnected, + // but finalization should still proceed. + if (doFinalize) { + finalizationFuture = Executors.newSingleThreadExecutor().submit( + () -> { + try { + scmClient.finalizeScmUpgrade(CLIENT_ID); + } catch (IOException ex) { + LOG.info("finalization client failed. This may be expected if the" + + " test injected failures.", ex); + } + }); + } + } + + @AfterEach + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test for an empty cluster. + */ + @Test + public void testFinalizationEmptyClusterDataDistribution() throws Exception { + init(new OzoneConfiguration(), null, true); + assertNull(cluster.getStorageContainerLocationClient().getDeletedBlockSummary()); + + finalizationFuture.get(); + TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID); + // Make sure old leader has caught up and all SCMs have finalized. + waitForScmsToFinalize(cluster.getStorageContainerManagersList()); + assertEquals(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION.layoutVersion(), + cluster.getStorageContainerManager().getLayoutVersionManager().getMetadataLayoutVersion()); + + TestHddsUpgradeUtils.testPostUpgradeConditionsSCM( + cluster.getStorageContainerManagersList(), 0, NUM_DATANODES); + TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes( + cluster.getHddsDatanodes(), 0, CLOSED); + assertNotNull(cluster.getStorageContainerLocationClient().getDeletedBlockSummary()); + + for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) { + DeletedBlockLogImpl deletedBlockLog = (DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog(); + SCMDeletedBlockTransactionStatusManager statusManager = + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager(); + HddsProtos.DeletedBlocksTransactionSummary summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + } + + long lastTxId = findLastTx(); + StorageContainerManager activeSCM = cluster.getActiveSCM(); + assertEquals(-1, lastTxId, "Last transaction ID should be -1"); + + // generate old format deletion tx, summary should keep empty, total DB tx 4 + int txCount = 4; + DeletedBlockLogImpl deletedBlockLog = (DeletedBlockLogImpl) activeSCM.getScmBlockManager().getDeletedBlockLog(); + deletedBlockLog.addTransactions(generateDeletedBlocks(txCount, false)); + flushDBTransactionBuffer(activeSCM); + ArrayList txIdList = getRowsInTable(activeSCM.getScmMetadataStore().getDeletedBlocksTXTable()); + assertEquals(txCount, txIdList.size()); + + SCMDeletedBlockTransactionStatusManager statusManager = + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager(); + HddsProtos.DeletedBlocksTransactionSummary summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + statusManager.removeTransactions(txIdList); + + // generate 4 new format deletion tx + Map> txList = generateDeletedBlocks(txCount, true); + deletedBlockLog.addTransactions(txList); + flushDBTransactionBuffer(activeSCM); + + ArrayList txWithSizeList = getRowsInTable(activeSCM.getScmMetadataStore().getDeletedBlocksTXTable()); + assertEquals(txCount, txWithSizeList.size()); + summary = statusManager.getTransactionSummary(); + assertEquals(txCount, summary.getTotalTransactionCount()); + assertEquals(txCount * BLOCKS_PER_TX, summary.getTotalBlockCount()); + assertEquals(txCount * BLOCKS_PER_TX * BLOCK_SIZE, summary.getTotalBlockSize()); + assertEquals(txCount * BLOCKS_PER_TX * BLOCK_SIZE * 3, summary.getTotalBlockReplicatedSize()); + + // wait for all transactions deleted by SCMBlockDeletingService + GenericTestUtils.waitFor(() -> { + try { + flushDBTransactionBuffer(activeSCM); + return getRowsInTable(activeSCM.getScmMetadataStore().getDeletedBlocksTXTable()).isEmpty(); + } catch (IOException e) { + fail("Failed to get keys from DeletedBlocksTXTable", e); + return false; + } + }, 100, 5000); + + // generate old format deletion tx, summary should keep the same + deletedBlockLog.addTransactions(generateDeletedBlocks(txCount, false)); + flushDBTransactionBuffer(activeSCM); + ArrayList txWithoutSizeList = getRowsInTable(activeSCM.getScmMetadataStore().getDeletedBlocksTXTable()); + assertEquals(txCount, txWithoutSizeList.size()); + summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + + // delete old format deletion tx, summary should keep the same + statusManager.removeTransactions(txWithoutSizeList); + flushDBTransactionBuffer(activeSCM); + summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + + // delete already deleted new format txs again, summary should become nearly empty + statusManager.removeTransactions(txWithSizeList); + flushDBTransactionBuffer(activeSCM); + summary = statusManager.getTransactionSummary(); + assertEquals(0, summary.getTotalTransactionCount()); + assertEquals(0, summary.getTotalBlockCount()); + assertEquals(0, summary.getTotalBlockSize()); + assertEquals(0, summary.getTotalBlockReplicatedSize()); + } + + /** + * Test for none empty cluster. + */ + @Test + public void testFinalizationNonEmptyClusterDataDistribution() throws Exception { + init(new OzoneConfiguration(), null, false); + // stop SCMBlockDeletingService + for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) { + scm.getScmBlockManager().getSCMBlockDeletingService().stop(); + } + + // write some tx + int txCount = 2; + StorageContainerManager activeSCM = cluster.getActiveSCM(); + activeSCM.getScmBlockManager().getDeletedBlockLog().addTransactions(generateDeletedBlocks(txCount, false)); + flushDBTransactionBuffer(activeSCM); + assertNull(cluster.getStorageContainerLocationClient().getDeletedBlockSummary()); + + finalizationFuture = Executors.newSingleThreadExecutor().submit( + () -> { + try { + scmClient.finalizeScmUpgrade(CLIENT_ID); + } catch (IOException ex) { + LOG.info("finalization client failed. This may be expected if the" + + " test injected failures.", ex); + } + }); + finalizationFuture.get(); + TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID); + // Make sure old leader has caught up and all SCMs have finalized. + waitForScmsToFinalize(cluster.getStorageContainerManagersList()); + assertEquals(HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION.layoutVersion(), + cluster.getStorageContainerManager().getLayoutVersionManager().getMetadataLayoutVersion()); + + TestHddsUpgradeUtils.testPostUpgradeConditionsSCM( + cluster.getStorageContainerManagersList(), 0, NUM_DATANODES); + TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes( + cluster.getHddsDatanodes(), 0, CLOSED); + assertNotNull(cluster.getStorageContainerLocationClient().getDeletedBlockSummary()); + + for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) { + DeletedBlockLogImpl deletedBlockLog = (DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog(); + SCMDeletedBlockTransactionStatusManager statusManager = + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager(); + HddsProtos.DeletedBlocksTransactionSummary summary = statusManager.getTransactionSummary(); + assertEquals(EMPTY_SUMMARY, summary); + } + + long lastTxId = findLastTx(); + assertNotEquals(-1, lastTxId, "Last transaction ID should not be -1"); + + final String keyName = "key" + System.nanoTime(); + // Create the key + String value = "sample value"; + TestDataUtil.createKey(bucket, keyName, ReplicationConfig.fromTypeAndFactor(RATIS, THREE), value.getBytes(UTF_8)); + // update scmInfo in OM + OzoneKeyDetails keyDetails = bucket.getKey(keyName); + // delete the key + bucket.deleteKey(keyName); + + DeletedBlockLogImpl deletedBlockLog = (DeletedBlockLogImpl) activeSCM.getScmBlockManager().getDeletedBlockLog(); + SCMDeletedBlockTransactionStatusManager statusManager = + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager(); + GenericTestUtils.waitFor( + () -> !EMPTY_SUMMARY.equals(statusManager.getTransactionSummary()), 100, 5000); + HddsProtos.DeletedBlocksTransactionSummary summary = statusManager.getTransactionSummary(); + assertEquals(1, summary.getTotalTransactionCount()); + assertEquals(1, summary.getTotalBlockCount()); + assertEquals(value.getBytes(UTF_8).length, summary.getTotalBlockSize()); + assertEquals(value.getBytes(UTF_8).length * 3, summary.getTotalBlockReplicatedSize()); + + // force close the container so that block can be deleted + activeSCM.getClientProtocolServer().closeContainer( + keyDetails.getOzoneKeyLocations().get(0).getContainerID()); + // wait for container to be closed + GenericTestUtils.waitFor(() -> { + try { + return activeSCM.getClientProtocolServer().getContainer( + keyDetails.getOzoneKeyLocations().get(0).getContainerID()) + .getState() == HddsProtos.LifeCycleState.CLOSED; + } catch (IOException e) { + fail("Error while checking container state", e); + return false; + } + }, 100, 5000); + + // flush buffer and start SCMBlockDeletingService + for (StorageContainerManager scm: cluster.getStorageContainerManagersList()) { + flushDBTransactionBuffer(scm); + scm.getScmBlockManager().getSCMBlockDeletingService().start(); + } + + // wait for block deletion transactions to be confirmed by DN + GenericTestUtils.waitFor( + () -> statusManager.getTransactionSummary().getTotalTransactionCount() == 0, 100, 10000); + } + + private Map> generateDeletedBlocks(int dataSize, boolean withSize) { + Map> blockMap = new HashMap<>(); + int continerIDBase = RandomUtils.secure().randomInt(0, 100); + int localIDBase = RandomUtils.secure().randomInt(0, 1000); + for (int i = 0; i < dataSize; i++) { + long containerID = continerIDBase + i; + List blocks = new ArrayList<>(); + for (int j = 0; j < BLOCKS_PER_TX; j++) { + long localID = localIDBase + j; + if (withSize) { + blocks.add(new DeletedBlock(new BlockID(containerID, localID), BLOCK_SIZE, BLOCK_SIZE * 3)); + } else { + blocks.add(new DeletedBlock(new BlockID(containerID, localID), SIZE_NOT_AVAILABLE, SIZE_NOT_AVAILABLE)); + } + } + blockMap.put(containerID, blocks); + } + return blockMap; + } + + private long findLastTx() throws RocksDatabaseException, CodecException { + StorageContainerManager activeSCM = cluster.getActiveSCM(); + long lastTxId = -1; + try (Table.KeyValueIterator iter = + activeSCM.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) { + while (iter.hasNext()) { + Table.KeyValue entry = iter.next(); + if (lastTxId < entry.getKey()) { + lastTxId = entry.getKey(); + } + } + } + return lastTxId; + } + + private void waitForScmsToFinalize(Collection scms) + throws Exception { + for (StorageContainerManager scm: scms) { + waitForScmToFinalize(scm); + } + } + + private void waitForScmToFinalize(StorageContainerManager scm) + throws Exception { + GenericTestUtils.waitFor(() -> !scm.isInSafeMode(), 500, 5000); + GenericTestUtils.waitFor(() -> { + FinalizationCheckpoint checkpoint = + scm.getScmContext().getFinalizationCheckpoint(); + LOG.info("Waiting for SCM {} (leader? {}) to finalize. Current " + + "finalization checkpoint is {}", + scm.getSCMNodeId(), scm.checkLeader(), checkpoint); + return checkpoint.hasCrossed( + FinalizationCheckpoint.FINALIZATION_COMPLETE); + }, 2_000, 60_000); + } + + private void flushDBTransactionBuffer(StorageContainerManager scm) throws IOException { + DBTransactionBuffer dbTxBuffer = scm.getScmHAManager().getDBTransactionBuffer(); + if (dbTxBuffer instanceof SCMHADBTransactionBuffer) { + SCMHADBTransactionBuffer buffer = (SCMHADBTransactionBuffer) dbTxBuffer; + buffer.flush(); + } + } + + private ArrayList getRowsInTable(Table table) + throws IOException { + ArrayList txIdList = new ArrayList<>(); + if (table != null) { + try (Table.KeyValueIterator keyValueTableIterator = table.iterator()) { + while (keyValueTableIterator.hasNext()) { + txIdList.add(keyValueTableIterator.next().getKey()); + } + } + } + return txIdList; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java new file mode 100644 index 000000000000..5516107266fc --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestBlockDeletionService.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.service; + +import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.HBASE_SUPPORT; +import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.STORAGE_SPACE_DISTRIBUTION; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.block.BlockManager; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMPerformanceMetrics; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.server.SCMConfigurator; +import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext; +import org.apache.hadoop.hdds.upgrade.TestHddsUpgradeUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.UniformDatanodesFactory; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeletedBlock; +import org.apache.hadoop.ozone.om.helpers.QuotaUtil; +import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; + +/** + * DeletionService test to Pass Usage from OM to SCM. + */ +public class TestBlockDeletionService { + private static final String CLIENT_ID = UUID.randomUUID().toString(); + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + private static final int KEY_SIZE = 5 * 1024; // 5 KB + private static MiniOzoneCluster cluster; + private static StorageContainerLocationProtocol scmClient; + private static OzoneBucket bucket; + private static SCMPerformanceMetrics metrics; + + public static Stream replicationConfigProvider() { + return Stream.of( + arguments(RatisReplicationConfig.getInstance(ReplicationFactor.ONE.toProto())), + arguments(RatisReplicationConfig.getInstance(ReplicationFactor.THREE.toProto())), + arguments(new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 2 * 1024 * 1024)), + arguments(new ECReplicationConfig(6, 3, ECReplicationConfig.EcCodec.RS, 2 * 1024 * 1024)), + arguments(StandaloneReplicationConfig.getInstance(ReplicationFactor.ONE.toProto())), + arguments(StandaloneReplicationConfig.getInstance(ReplicationFactor.THREE.toProto())) + ); + } + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500, TimeUnit.MILLISECONDS); + conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY, HBASE_SUPPORT.layoutVersion()); + + InjectedUpgradeFinalizationExecutor + scmFinalizationExecutor = new InjectedUpgradeFinalizationExecutor<>(); + SCMConfigurator configurator = new SCMConfigurator(); + configurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(9) + .setSCMConfigurator(configurator) + .setDatanodeFactory(UniformDatanodesFactory.newBuilder() + .setLayoutVersion(HBASE_SUPPORT.layoutVersion()).build()) + .build(); + cluster.waitForClusterToBeReady(); + scmClient = cluster.getStorageContainerLocationClient(); + assertEquals(HBASE_SUPPORT.ordinal(), + cluster.getStorageContainerManager().getLayoutVersionManager().getMetadataLayoutVersion()); + metrics = cluster.getStorageContainerManager().getBlockProtocolServer().getMetrics(); + + OzoneClient ozoneClient = cluster.newClient(); + // create a volume and a bucket to be used by OzoneFileSystem + ozoneClient.getObjectStore().createVolume(VOLUME_NAME); + ozoneClient.getObjectStore().getVolume(VOLUME_NAME).createBucket(BUCKET_NAME); + bucket = ozoneClient.getObjectStore().getVolume(VOLUME_NAME).getBucket(BUCKET_NAME); + } + + @AfterAll + public static void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testDeleteKeyQuotaWithUpgrade() throws Exception { + long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks(); + long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks(); + + ReplicationConfig replicationConfig = RatisReplicationConfig.getInstance(ReplicationFactor.THREE.toProto()); + // PRE-UPGRADE + // Step 1: write a key + String keyName = UUID.randomUUID().toString(); + createKey(keyName, replicationConfig); + // Step 2: Spy on BlockManager and inject it into SCM + BlockManager spyManagerBefore = injectSpyBlockManager(cluster); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + // Step 3: Delete the key (which triggers deleteBlocks call) + bucket.deleteKey(keyName); + // Step 4: Verify deleteBlocks call and capture argument + verify(spyManagerBefore, timeout(50000).atLeastOnce()).deleteBlocks(captor.capture()); + verifyAndAssertQuota(replicationConfig, captor); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - initialSuccessBlocks == 1, 50, 1000); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - initialFailedBlocks == 0, 50, 1000); + + // UPGRADE SCM (if specified) + // Step 5: wait for finalizing upgrade + Future finalizationFuture = Executors.newSingleThreadExecutor().submit(() -> { + try { + scmClient.finalizeScmUpgrade(CLIENT_ID); + } catch (IOException ex) { + fail("finalization client failed", ex); + } + }); + finalizationFuture.get(); + TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID); + assertEquals(STORAGE_SPACE_DISTRIBUTION.ordinal(), + cluster.getStorageContainerManager().getLayoutVersionManager().getMetadataLayoutVersion()); + + // POST-UPGRADE + //Step 6: Repeat the same steps in pre-upgrade + keyName = UUID.randomUUID().toString(); + createKey(keyName, replicationConfig); + BlockManager spyManagerAfter = injectSpyBlockManager(cluster); + bucket.deleteKey(keyName); + verify(spyManagerAfter, timeout(50000).atLeastOnce()).deleteBlocks(captor.capture()); + verifyAndAssertQuota(replicationConfig, captor); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - initialSuccessBlocks == 2, 50, 1000); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - initialFailedBlocks == 0, 50, 1000); + } + + @ParameterizedTest + @MethodSource("replicationConfigProvider") + public void testDeleteKeyQuotaWithDifferentReplicationTypes(ReplicationConfig replicationConfig) throws Exception { + long initialSuccessBlocks = metrics.getDeleteKeySuccessBlocks(); + long initialFailedBlocks = metrics.getDeleteKeyFailedBlocks(); + + // Step 1: write a key + String keyName = UUID.randomUUID().toString(); + createKey(keyName, replicationConfig); + // Step 2: Spy on BlockManager and inject it into SCM + BlockManager spyManagerBefore = injectSpyBlockManager(cluster); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + // Step 3: Delete the key (which triggers deleteBlocks call) + bucket.deleteKey(keyName); + // Step 4: Verify deleteBlocks call and capture argument + verify(spyManagerBefore, timeout(50000).atLeastOnce()).deleteBlocks(captor.capture()); + verifyAndAssertQuota(replicationConfig, captor); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeySuccessBlocks() - initialSuccessBlocks == 1, 50, 1000); + GenericTestUtils.waitFor(() -> metrics.getDeleteKeyFailedBlocks() - initialFailedBlocks == 0, 50, 1000); + } + + private void createKey(String keyName, ReplicationConfig replicationConfig) throws IOException { + byte[] data = new byte[KEY_SIZE]; + try (OzoneOutputStream out = bucket.createKey(keyName, KEY_SIZE, + replicationConfig, new HashMap<>())) { + out.write(data); + } + } + + private BlockManager injectSpyBlockManager(MiniOzoneCluster miniOzoneCluster) throws Exception { + StorageContainerManager scm = miniOzoneCluster.getStorageContainerManager(); + BlockManager realManager = scm.getScmBlockManager(); + BlockManager spyManager = spy(realManager); + + Field field = scm.getClass().getDeclaredField("scmBlockManager"); + field.setAccessible(true); + field.set(scm, spyManager); + return spyManager; + } + + private void verifyAndAssertQuota(ReplicationConfig replicationConfig, + ArgumentCaptor> captor) throws IOException { + int index = captor.getAllValues().size() - 1; + List blockGroups = captor.getAllValues().get(index); + + long totalUsedBytes = blockGroups.stream() + .flatMap(group -> group.getDeletedBlocks().stream()) + .mapToLong(DeletedBlock::getReplicatedSize).sum(); + + long totalUnreplicatedBytes = blockGroups.stream() + .flatMap(group -> group.getDeletedBlocks().stream()) + .mapToLong(DeletedBlock::getSize).sum(); + + assertEquals(1, blockGroups.get(0).getDeletedBlocks().size()); + assertEquals(QuotaUtil.getReplicatedSize(KEY_SIZE, replicationConfig), totalUsedBytes); + assertEquals(KEY_SIZE, totalUnreplicatedBytes); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestDeletedBlocksTxnShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestDeletedBlocksTxnShell.java new file mode 100644 index 000000000000..3f4c12f5347a --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestDeletedBlocksTxnShell.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.shell; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.admin.scm.GetDeletedBlockSummarySubcommand; +import org.apache.hadoop.ozone.common.DeletedBlock; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test for DeletedBlocksTxnSubcommand Cli. + */ +public class TestDeletedBlocksTxnShell { + + private static final Logger LOG = LoggerFactory + .getLogger(TestDeletedBlocksTxnShell.class); + + private final PrintStream originalOut = System.out; + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private MiniOzoneHAClusterImpl cluster = null; + private OzoneConfiguration conf; + private int numOfSCMs = 3; + + private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name(); + private static final int BLOCKS_PER_TX = 5; + private static final int BLOCK_SIZE = 100; + private static final int BLOCK_REPLICATED_SIZE = 300; + + @TempDir + private Path tempDir; + + /** + * Create a MiniOzoneHACluster for testing. + * + * @throws IOException + */ + @BeforeEach + public void init() throws Exception { + conf = new OzoneConfiguration(); + String scmServiceId = "scm-service-test1"; + + cluster = MiniOzoneCluster.newHABuilder(conf) + .setSCMServiceId(scmServiceId) + .setNumOfStorageContainerManagers(numOfSCMs) + .setNumOfActiveSCMs(numOfSCMs) + .setNumOfOzoneManagers(1) + .build(); + cluster.waitForClusterToBeReady(); + + File txnFile = tempDir.resolve("txn.txt").toFile(); + LOG.info("txnFile path: {}", txnFile.getAbsolutePath()); + System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING)); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterEach + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + System.setOut(originalOut); + } + + //> + private Map> generateData(int dataSize) throws Exception { + Map> blockMap = new HashMap<>(); + int continerIDBase = RandomUtils.secure().randomInt(0, 100); + int localIDBase = RandomUtils.secure().randomInt(0, 1000); + for (int i = 0; i < dataSize; i++) { + long containerID = continerIDBase + i; + updateContainerMetadata(containerID); + List blocks = new ArrayList<>(); + for (int j = 0; j < BLOCKS_PER_TX; j++) { + long localID = localIDBase + j; + blocks.add(new DeletedBlock(new BlockID(containerID, localID), BLOCK_SIZE, BLOCK_REPLICATED_SIZE)); + } + blockMap.put(containerID, blocks); + } + return blockMap; + } + + private void updateContainerMetadata(long cid) throws Exception { + final ContainerInfo container = + new ContainerInfo.Builder() + .setContainerID(cid) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setState(HddsProtos.LifeCycleState.CLOSED) + .setOwner("TestDeletedBlockLog") + .setPipelineID(PipelineID.randomId()) + .build(); + final Set replicaSet = cluster.getHddsDatanodes() + .subList(0, 3) + .stream() + .map(dn -> ContainerReplica.newBuilder() + .setContainerID(container.containerID()) + .setContainerState(State.CLOSED) + .setDatanodeDetails(dn.getDatanodeDetails()) + .build()) + .collect(Collectors.toSet()); + ContainerStateManager containerStateManager = getSCMLeader(). + getContainerManager().getContainerStateManager(); + containerStateManager.addContainer(container.getProtobuf()); + for (ContainerReplica replica: replicaSet) { + containerStateManager.updateContainerReplica(replica); + } + } + + private StorageContainerManager getSCMLeader() { + return cluster.getStorageContainerManagersList() + .stream().filter(a -> a.getScmContext().isLeaderReady()) + .collect(Collectors.toList()).get(0); + } + + private void flush() throws Exception { + // only flush leader here, avoid the follower concurrent flush and write + getSCMLeader().getScmHAManager().asSCMHADBTransactionBuffer().flush(); + } + + @Test + public void testGetDeletedBlockSummarySubcommand() throws Exception { + int currentValidTxnNum; + // add 30 block deletion transactions + DeletedBlockLog deletedBlockLog = getSCMLeader(). + getScmBlockManager().getDeletedBlockLog(); + deletedBlockLog.addTransactions(generateData(30)); + flush(); + currentValidTxnNum = deletedBlockLog.getNumOfValidTransactions(); + LOG.info("Valid num of txns: {}", currentValidTxnNum); + assertEquals(30, currentValidTxnNum); + DeletedBlocksTransactionSummary summary = deletedBlockLog.getTransactionSummary(); + assertEquals(30, summary.getTotalTransactionCount()); + assertEquals(30 * BLOCKS_PER_TX, summary.getTotalBlockCount()); + assertEquals(30 * BLOCKS_PER_TX * BLOCK_SIZE, summary.getTotalBlockSize()); + assertEquals(30 * BLOCKS_PER_TX * BLOCK_REPLICATED_SIZE, summary.getTotalBlockReplicatedSize()); + + GetDeletedBlockSummarySubcommand getDeletedBlockSummarySubcommand = + new GetDeletedBlockSummarySubcommand(); + outContent.reset(); + ContainerOperationClient scmClient = new ContainerOperationClient(conf); + getDeletedBlockSummarySubcommand.execute(scmClient); + String output = outContent.toString(DEFAULT_ENCODING); + assertTrue(output.contains("Total number of transactions: 30")); + assertTrue(output.contains("Total number of blocks: 150")); + assertTrue(output.contains("Total size of blocks: 15000")); + assertTrue(output.contains("Total replicated size of blocks: 45000")); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index b6b7e3cf5b41..57067c421344 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl; +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; @@ -693,6 +694,21 @@ public ReconfigurationHandler getReconfigurationHandler() { return null; } + @Override + public SCMMetadataStore getScmMetadataStore() { + return null; + } + + @Override + public SCMHAManager getScmHAManager() { + return null; + } + + @Override + public SequenceIdGenerator getSequenceIdGen() { + return null; + } + public DBStore getScmDBStore() { return dbStore; }