diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index bc675d0ff988..f6fd621393fb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -36,10 +36,12 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -61,7 +63,7 @@ import org.slf4j.LoggerFactory; /** - * A implement class of {@link DeletedBlockLog}, and it uses + * An implement class of {@link DeletedBlockLog}, and it uses * K/V db to maintain block deletion transactions between scm and datanode. * This is a very basic implementation, it simply scans the log and * memorize the position that scanned by last time, and uses this to @@ -81,7 +83,7 @@ public class DeletedBlockLogImpl private final Lock lock; // The access to DeletedBlocksTXTable is protected by // DeletedBlockLogStateManager. - private final DeletedBlockLogStateManager deletedBlockLogStateManager; + private DeletedBlockLogStateManager deletedBlockLogStateManager; private final SCMContext scmContext; private final SequenceIdGenerator sequenceIdGen; private final ScmBlockDeletingServiceMetrics metrics; @@ -91,6 +93,7 @@ public class DeletedBlockLogImpl private static final int LIST_ALL_FAILED_TRANSACTIONS = -1; private long lastProcessedTransactionId = -1; + private final int logAppenderQueueByteLimit; public DeletedBlockLogImpl(ConfigurationSource conf, StorageContainerManager scm, @@ -116,6 +119,16 @@ public DeletedBlockLogImpl(ConfigurationSource conf, this.transactionStatusManager = new SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager, containerManager, metrics, scmCommandTimeoutMs); + int limit = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + this.logAppenderQueueByteLimit = (int) (limit * 0.9); + } + + @VisibleForTesting + void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) { + this.deletedBlockLogStateManager = manager; } @Override @@ -285,16 +298,27 @@ public void addTransactions(Map> containerBlocksMap) lock.lock(); try { ArrayList txsToBeAdded = new ArrayList<>(); + long currentBatchSizeBytes = 0; for (Map.Entry< Long, List< Long > > entry : containerBlocksMap.entrySet()) { long nextTXID = sequenceIdGen.getNextId(DEL_TXN_ID); DeletedBlocksTransaction tx = constructNewTransaction(nextTXID, entry.getKey(), entry.getValue()); txsToBeAdded.add(tx); + long txSize = tx.getSerializedSize(); + currentBatchSizeBytes += txSize; + + if (currentBatchSizeBytes >= logAppenderQueueByteLimit) { + deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); + metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size()); + txsToBeAdded.clear(); + currentBatchSizeBytes = 0; + } + } + if (!txsToBeAdded.isEmpty()) { + deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); + metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size()); } - - deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded); - metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size()); } finally { lock.unlock(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 4725b30ab8f8..f2289ecb2f5f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -21,9 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -48,6 +50,7 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -57,6 +60,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -434,6 +438,21 @@ public void testResetCount() throws Exception { assertEquals(30 * THREE, blocks.size()); } + @Test + public void testAddTransactionsIsBatched() throws Exception { + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, 1, StorageUnit.KB); + + DeletedBlockLogStateManager mockStateManager = mock(DeletedBlockLogStateManager.class); + DeletedBlockLogImpl log = new DeletedBlockLogImpl(conf, scm, containerManager, scmHADBTransactionBuffer, metrics); + + log.setDeletedBlockLogStateManager(mockStateManager); + + Map> containerBlocksMap = generateData(100); + log.addTransactions(containerBlocksMap); + + verify(mockStateManager, atLeast(2)).addTransactionsToDB(any()); + } + @Test public void testSCMDelIteratorProgress() throws Exception { int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);