Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class DeletedBlockLogImpl
private final Lock lock;
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
// Maps txId to its retry counts;
private Map<Long, Integer> transactionRetryCountMap;

private final AtomicLong largestTxnId;
// largest transactionId is stored at largestTxnIdHolderKey
Expand All @@ -102,7 +104,7 @@ public DeletedBlockLogImpl(ConfigurationSource conf,

// maps transaction to dns which have committed it.
transactionToDNsCommitMap = new ConcurrentHashMap<>();

transactionRetryCountMap = new ConcurrentHashMap<>();
this.largestTxnId = new AtomicLong(this.getLargestRecordedTXID());
}

Expand Down Expand Up @@ -187,19 +189,30 @@ public void incrementCount(List<Long> txIDs) throws IOException {
}
continue;
}
DeletedBlocksTransaction.Builder builder = block.toBuilder();
int currentCount = block.getCount();

int currentCount =
transactionRetryCountMap.getOrDefault(txID, block.getCount());
if (currentCount > -1) {
builder.setCount(++currentCount);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
if (currentCount > maxRetry) {
builder.setCount(-1);
int nextCount = currentCount + 1;
DeletedBlocksTransaction.Builder builder = block.toBuilder();
if (nextCount > maxRetry) {
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
builder.setCount(-1);
scmMetadataStore.getDeletedBlocksTXTable().put(txID,
builder.build());
transactionRetryCountMap.remove(txID);
} else if (nextCount % 100 == 0) {
// write retry count after every 100 retries into DB.
builder.setCount(nextCount);
scmMetadataStore.getDeletedBlocksTXTable().put(txID,
builder.build());
transactionRetryCountMap.put(txID, nextCount);
} else {
transactionRetryCountMap.put(txID, nextCount);
}
}
scmMetadataStore.getDeletedBlocksTXTable().put(txID,
builder.build());
} catch (IOException ex) {
LOG.warn("Cannot increase count for txID " + txID, ex);
// We do not throw error here, since we don't want to abort the loop.
Expand Down Expand Up @@ -274,6 +287,7 @@ public void commitTransactions(
.collect(Collectors.toList());
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
transactionRetryCountMap.remove(txID);
if (LOG.isDebugEnabled()) {
LOG.debug("Purging txId={} from block deletion log", txID);
}
Expand Down Expand Up @@ -429,6 +443,10 @@ public DatanodeDeletedBlockTransactions getTransactions(
}
}
purgeTransactions(txnsToBePurged);
for (DeletedBlocksTransaction trx : txnsToBePurged) {
transactionRetryCountMap.remove(trx.getTxID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to PR: Can we also remove from transactionToDNsCommitMap here for better surity?

Copy link
Contributor Author

@amaliujia amaliujia Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact this might be a bug: for purgeTransaction I didn't see transactionToDNsCommitMap is cleaned up properly.

Add the cleaning up of transactionToDNsCommitMap here.

transactionToDNsCommitMap.remove(trx.getTxID());
}
}
return transactions;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -238,6 +238,53 @@ public void testIncrementCount() throws Exception {
Assert.assertEquals(blocks.size(), 0);
}

@Test
public void testIncrementCountLessFrequentWritingToDB() throws Exception {
OzoneConfiguration testConf = OzoneConfiguration.of(conf);
testConf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 120);

deletedBlockLog = new DeletedBlockLogImpl(testConf, containerManager,
scm.getScmMetadataStore());

for (Map.Entry<Long, List<Long>> entry :
generateData(1).entrySet()) {
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}

List<DeletedBlocksTransaction> blocks =
getTransactions(40 * BLOCKS_PER_TXN);
List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());

for (int i = 0; i < 50; i++) {
deletedBlockLog.incrementCount(txIDs);
}
blocks = getTransactions(40 * BLOCKS_PER_TXN);
for (DeletedBlocksTransaction block : blocks) {
// block count should not be updated as there are only 50 retries.
Assert.assertEquals(0, block.getCount());
}

for (int i = 0; i < 60; i++) {
deletedBlockLog.incrementCount(txIDs);
}
blocks = getTransactions(40 * BLOCKS_PER_TXN);
for (DeletedBlocksTransaction block : blocks) {
// block count should be updated to 100 as there are already 110 retries.
Assert.assertEquals(100, block.getCount());
}

for (int i = 0; i < 50; i++) {
deletedBlockLog.incrementCount(txIDs);
}
blocks = getTransactions(40 * BLOCKS_PER_TXN);
for (DeletedBlocksTransaction block : blocks) {
// block count should be updated to -1 as retry count exceeds maxRetry
// (i.e. 160 > maxRetry which is 120).
Assert.assertEquals(-1, block.getCount());
}
}

@Test
public void testCommitTransactions() throws Exception {
for (Map.Entry<Long, List<Long>> entry : generateData(50).entrySet()){
Expand Down