diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 45d6a0249388..226489482f67 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -93,6 +93,7 @@ public class DeletedBlockLogImpl private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis(); private static final int LIST_ALL_FAILED_TRANSACTIONS = -1; + private long lastProcessedTransactionId = -1; public DeletedBlockLogImpl(ConfigurationSource conf, StorageContainerManager scm, @@ -344,6 +345,34 @@ public DatanodeDeletedBlockTransactions getTransactions( try (TableIterator> iter = deletedBlockLogStateManager.getReadOnlyIterator()) { + if (lastProcessedTransactionId != -1) { + iter.seek(lastProcessedTransactionId); + /* + * We should start from (lastProcessedTransactionId + 1) transaction. + * Now the iterator (iter.next call) is pointing at + * lastProcessedTransactionId, read the current value to move + * the cursor. + */ + if (iter.hasNext()) { + /* + * There is a possibility that the lastProcessedTransactionId got + * deleted from the table, in that case we have to set + * lastProcessedTransactionId to next available transaction in the table. + * + * By doing this there is a chance that we will skip processing the new + * lastProcessedTransactionId, that should be ok. We can get to it in the + * next run. + */ + lastProcessedTransactionId = iter.next().getKey(); + } + + // If we have reached the end, go to beginning. + if (!iter.hasNext()) { + iter.seekToFirst(); + lastProcessedTransactionId = -1; + } + } + // Get the CmdStatus status of the aggregation, so that the current // status of the specified transaction can be found faster Map> commandStatus = @@ -352,13 +381,14 @@ public DatanodeDeletedBlockTransactions getTransactions( map(DatanodeDetails::getUuid).collect(Collectors.toSet())); ArrayList txIDs = new ArrayList<>(); metrics.setNumBlockDeletionTransactionDataNodes(dnList.size()); + Table.KeyValue keyValue = null; // Here takes block replica count as the threshold to avoid the case // that part of replicas committed the TXN and recorded in the // SCMDeletedBlockTransactionStatusManager, while they are counted // in the threshold. while (iter.hasNext() && transactions.getBlocksDeleted() < blockDeletionLimit) { - Table.KeyValue keyValue = iter.next(); + keyValue = iter.next(); DeletedBlocksTransaction txn = keyValue.getValue(); final ContainerID id = ContainerID.valueOf(txn.getContainerID()); try { @@ -386,7 +416,24 @@ public DatanodeDeletedBlockTransactions getTransactions( LOG.warn("Container: {} was not found for the transaction: {}.", id, txn); txIDs.add(txn.getTxID()); } + + if (lastProcessedTransactionId == keyValue.getKey()) { + // We have circled back to the last transaction. + break; + } + + if (!iter.hasNext() && lastProcessedTransactionId != -1) { + /* + * We started from in-between and reached end of the table, + * now we should go to the start of the table and process + * the transactions. + */ + iter.seekToFirst(); + } } + + lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1; + if (!txIDs.isEmpty()) { deletedBlockLogStateManager.removeTransactionsFromDB(txIDs); metrics.incrBlockDeletionTransactionCompleted(txIDs.size()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java index 6e6440c324b6..43809acf4bc6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogStateManagerImpl.java @@ -133,7 +133,8 @@ public void close() throws IOException { @Override public void seekToFirst() { - throw new UnsupportedOperationException("seekToFirst"); + iter.seekToFirst(); + findNext(); } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index c8e2f267aff3..2a012cbe180e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -441,6 +441,75 @@ public void testResetCount() throws Exception { assertEquals(30 * THREE, blocks.size()); } + + @Test + public void testSCMDelIteratorProgress() throws Exception { + int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20); + + // CASE1: When all transactions are valid and available + // Create 8 TXs in the log. + int noOfTransactions = 8; + addTransactions(generateData(noOfTransactions), true); + mockContainerHealthResult(true); + List blocks; + + List txIDs = new ArrayList<>(); + int i = 1; + while (i < noOfTransactions) { + // In each iteration read two transaction, API returns all the transactions in order. + // 1st iteration: {1, 2} + // 2nd iteration: {3, 4} + // 3rd iteration: {5, 6} + // 4th iteration: {7, 8} + blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE); + assertEquals(blocks.get(0).getTxID(), i++); + assertEquals(blocks.get(1).getTxID(), i++); + } + + // CASE2: When some transactions are not available for delete in the current iteration, + // either due to max retry reach or some other issue. + // New transactions Id is { 9, 10, 11, 12, 13, 14, 15, 16} + addTransactions(generateData(noOfTransactions), true); + mockContainerHealthResult(true); + + // Mark transaction Id 11 as reached max retry count so that it will be ignored + // by scm deleting service while fetching transaction for delete + int ignoreTransactionId = 11; + txIDs.add((long) ignoreTransactionId); + for (i = 0; i < maxRetry; i++) { + incrementCount(txIDs); + } + incrementCount(txIDs); + + i = 9; + while (true) { + // In each iteration read two transaction. + // If any transaction which is not available for delete in the current iteration, + // it will be ignored and will be re-checked again only after complete table is read. + // 1st iteration: {9, 10} + // 2nd iteration: {12, 13} Transaction 11 is ignored here + // 3rd iteration: {14, 15} Transaction 11 is available here, + // but it will be read only when all db records are read till the end. + // 4th iteration: {16, 11} Since iterator reached at the end of table after reading transaction 16, + // Iterator starts from beginning again, and it returns transaction 11 as well + blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE); + if (i == ignoreTransactionId) { + i++; + } + assertEquals(blocks.get(0).getTxID(), i++); + if (i == 17) { + assertEquals(blocks.get(1).getTxID(), ignoreTransactionId); + break; + } + assertEquals(blocks.get(1).getTxID(), i++); + + if (i == 14) { + // Reset transaction 11 so that it will be available in scm key deleting service in the subsequent iterations. + resetCount(txIDs); + } + } + } + @Test public void testCommitTransactions() throws Exception { deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);