Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand All @@ -61,7 +63,7 @@
import org.slf4j.LoggerFactory;

/**
* A implement class of {@link DeletedBlockLog}, and it uses
* An implement class of {@link DeletedBlockLog}, and it uses
* K/V db to maintain block deletion transactions between scm and datanode.
* This is a very basic implementation, it simply scans the log and
* memorize the position that scanned by last time, and uses this to
Expand All @@ -81,7 +83,7 @@ public class DeletedBlockLogImpl
private final Lock lock;
// The access to DeletedBlocksTXTable is protected by
// DeletedBlockLogStateManager.
private final DeletedBlockLogStateManager deletedBlockLogStateManager;
private DeletedBlockLogStateManager deletedBlockLogStateManager;
private final SCMContext scmContext;
private final SequenceIdGenerator sequenceIdGen;
private final ScmBlockDeletingServiceMetrics metrics;
Expand All @@ -91,6 +93,7 @@ public class DeletedBlockLogImpl

private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
private long lastProcessedTransactionId = -1;
private final int logAppenderQueueByteLimit;

public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
Expand All @@ -116,6 +119,16 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
this.transactionStatusManager =
new SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager,
containerManager, metrics, scmCommandTimeoutMs);
int limit = (int) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT,
ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
this.logAppenderQueueByteLimit = (int) (limit * 0.9);
}

@VisibleForTesting
void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) {
this.deletedBlockLogStateManager = manager;
}

@Override
Expand Down Expand Up @@ -285,16 +298,27 @@ public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
lock.lock();
try {
ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
long currentBatchSizeBytes = 0;
for (Map.Entry< Long, List< Long > > entry :
containerBlocksMap.entrySet()) {
long nextTXID = sequenceIdGen.getNextId(DEL_TXN_ID);
DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
entry.getKey(), entry.getValue());
txsToBeAdded.add(tx);
long txSize = tx.getSerializedSize();
currentBatchSizeBytes += txSize;

if (currentBatchSizeBytes >= logAppenderQueueByteLimit) {
deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
txsToBeAdded.clear();
currentBatchSizeBytes = 0;
}
}
if (!txsToBeAdded.isEmpty()) {
deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
}

deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -48,6 +50,7 @@
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
Expand All @@ -57,6 +60,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand Down Expand Up @@ -434,6 +438,21 @@ public void testResetCount() throws Exception {
assertEquals(30 * THREE, blocks.size());
}

@Test
public void testAddTransactionsIsBatched() throws Exception {
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, 1, StorageUnit.KB);

DeletedBlockLogStateManager mockStateManager = mock(DeletedBlockLogStateManager.class);
DeletedBlockLogImpl log = new DeletedBlockLogImpl(conf, scm, containerManager, scmHADBTransactionBuffer, metrics);

log.setDeletedBlockLogStateManager(mockStateManager);

Map<Long, List<Long>> containerBlocksMap = generateData(100);
log.addTransactions(containerBlocksMap);

verify(mockStateManager, atLeast(2)).addTransactionsToDB(any());
}

@Test
public void testSCMDelIteratorProgress() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
Expand Down