From cb84b6b94b2b020051d860175305a07fbe917b22 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 29 Jul 2025 04:17:34 +0530 Subject: [PATCH 1/9] Decouple key delete ratis request. --- ...ocationProtocolClientSideTranslatorPB.java | 51 +++++- .../org/apache/hadoop/hdds/utils/HAUtils.java | 2 +- .../hdds/scm/TestFailoverWithSCMHA.java | 2 +- .../ozone/TestGetClusterTreeInformation.java | 2 +- .../ozone/om/service/KeyDeletingService.java | 168 +++++++++++++----- 5 files changed, 176 insertions(+), 49 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index a7324d481c48..c17e2b4a6f5d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; @@ -51,6 +53,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type; import org.apache.hadoop.hdds.scm.AddSCMRequest; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -68,6 +71,8 @@ import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is the client-side translator to translate the requests made on @@ -78,6 +83,12 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable { + private static final Logger LOG = + LoggerFactory.getLogger(ScmBlockLocationProtocolClientSideTranslatorPB.class); + + private static final double RATIS_LIMIT_FACTOR = 0.9; + private int ratisByteLimit; + /** * RpcController is not used and hence is set to null. */ @@ -93,12 +104,18 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB * failover proxy provider. */ public ScmBlockLocationProtocolClientSideTranslatorPB( - SCMBlockLocationFailoverProxyProvider proxyProvider) { + SCMBlockLocationFailoverProxyProvider proxyProvider, OzoneConfiguration conf) { Preconditions.checkState(proxyProvider != null); this.failoverProxyProvider = proxyProvider; this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create( ScmBlockLocationProtocolPB.class, failoverProxyProvider, failoverProxyProvider.getRetryPolicy()); + int limit = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // always go to 90% of max limit for request as other header will be added + this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR); } /** @@ -230,18 +247,40 @@ public List allocateBlock( @Override public List deleteKeyBlocks( List keyBlocksInfoList) throws IOException { - List keyBlocksProto = keyBlocksInfoList.stream() - .map(BlockGroup::getProto).collect(Collectors.toList()); + + List allResults = new ArrayList<>(); + List batch = new ArrayList<>(); + + int serializedSize = 0; + for (BlockGroup bg : keyBlocksInfoList) { + int currSize = bg.getProto().getSerializedSize(); + if (currSize + serializedSize > ratisByteLimit) { + allResults.addAll(submitDeleteKeyBlocks(batch)); + LOG.info("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize); + serializedSize = 0; + batch.clear(); + } + batch.add(bg.getProto()); + serializedSize += currSize; + } + + if (!batch.isEmpty()) { + allResults.addAll(submitDeleteKeyBlocks(batch)); + } + + return allResults; + } + + private List submitDeleteKeyBlocks(List batch) + throws IOException { DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto .newBuilder() - .addAllKeyBlocks(keyBlocksProto) + .addAllKeyBlocks(batch) .build(); - SCMBlockLocationRequest wrapper = createSCMBlockRequest( Type.DeleteScmKeyBlocks) .setDeleteScmKeyBlocksRequest(request) .build(); - final SCMBlockLocationResponse wrappedResponse = handleError(submitRequest(wrapper)); final DeleteScmKeyBlocksResponseProto resp = diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java index 81df3a4cefb3..406736f53107 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java @@ -131,7 +131,7 @@ public static ScmBlockLocationProtocol getScmBlockClient( OzoneConfiguration conf) { ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - new SCMBlockLocationFailoverProxyProvider(conf)); + new SCMBlockLocationFailoverProxyProvider(conf), conf); return TracingUtil .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class, conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java index 41d7d2690a71..68617011d71c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java @@ -112,7 +112,7 @@ public void testFailover() throws Exception { failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - failoverProxyProvider); + failoverProxyProvider, conf); GenericTestUtils .setLogLevel(SCMBlockLocationFailoverProxyProvider.class, Level.DEBUG); LogCapturer logCapture = LogCapturer.captureLogs(SCMBlockLocationFailoverProxyProvider.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java index 7cc308feee7a..8a88a72b9909 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java @@ -59,7 +59,7 @@ public void testGetClusterTreeInformation() throws IOException { failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - failoverProxyProvider); + failoverProxyProvider, conf); InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT); InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index c9c1237c27eb..19c33ecd7171 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -148,7 +148,7 @@ Pair processKeyDeletes(List keyBlocksList, if (blockDeletionResults != null) { long purgeStartTime = Time.monotonicNow(); purgeResult = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId); + keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId, ratisByteLimit); int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", @@ -164,15 +164,23 @@ Pair processKeyDeletes(List keyBlocksList, * @param results DeleteBlockGroups returned by SCM. * @param keysToModify Updated list of RepeatedOmKeyInfo */ - private Pair submitPurgeKeysRequest(List results, - Map keysToModify, List renameEntriesToBeDeleted, - String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException { + @SuppressWarnings("checkstyle:MethodLength") + private Pair submitPurgeKeysRequest( + List results, + Map keysToModify, + List renameEntriesToBeDeleted, + String snapTableKey, + UUID expectedPreviousSnapshotId, + int ratisLimit) throws InterruptedException { + List purgeKeys = new ArrayList<>(); // Put all keys to be purged in a list int deletedCount = 0; Set failedDeletedKeys = new HashSet<>(); boolean purgeSuccess = true; + + // Step 1: Process DeleteBlockGroupResults for (DeleteBlockGroupResult result : results) { String deletedKey = result.getObjectKey(); if (result.isSuccess()) { @@ -198,25 +206,7 @@ private Pair submitPurgeKeysRequest(List keysToUpdateList = new ArrayList<>(); if (keysToModify != null) { for (Map.Entry keyToModify : @@ -234,27 +224,125 @@ private Pair submitPurgeKeysRequest(List batchPurgeKeys = new ArrayList<>(); + int estimatedPurgeKeysSize = 0; + + while (purgeKeyIndex < purgeKeys.size()) { + String nextKey = purgeKeys.get(purgeKeyIndex); + + List temp = new ArrayList<>(batchPurgeKeys); + temp.add(nextKey); + + OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() + .setVolumeName("").setBucketName("").addAllKeys(temp).build(); + + PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); + tempBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys); + + estimatedPurgeKeysSize = tempBuilder.build().getSerializedSize(); + if (currSize + estimatedPurgeKeysSize > remainingRatisLimit) { + flag = true; + break; + } + + batchPurgeKeys.add(nextKey); + purgeKeyIndex++; } - } - OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) - .setPurgeKeysRequest(purgeKeysRequest) - .setClientId(getClientId().toString()) - .build(); - - // Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots. - try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { - OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); - if (omResponse != null) { - purgeSuccess = purgeSuccess && omResponse.getSuccess(); + // Now actually add batchPurgeKeys + if (!batchPurgeKeys.isEmpty()) { + OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() + .setVolumeName("").setBucketName("") + .addAllKeys(batchPurgeKeys).build(); + requestBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys); + currSize += requestBuilder.build().getSerializedSize(); + } + + remainingRatisLimit -= currSize; + + // 3.2 Add keysToUpdate + while (!flag && updateIndex < keysToUpdateList.size()) { + OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); + + PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); + tempBuilder.addKeysToUpdate(nextUpdate); + int estimatedSize = tempBuilder.build().getSerializedSize(); + + if (currSize + estimatedSize > remainingRatisLimit) { + flag = true; + break; + } + + requestBuilder.addKeysToUpdate(nextUpdate); + currSize += estimatedSize; + updateIndex++; + } + + remainingRatisLimit -= currSize; + + // 3.3 Add renamed keys + while (!flag && renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) { + String nextRename = renameEntriesToBeDeleted.get(renameIndex); + + PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); + tempBuilder.addRenamedKeys(nextRename); + int estimatedSize = tempBuilder.build().getSerializedSize(); + + if (currSize + estimatedSize > remainingRatisLimit) { + break; + } + + requestBuilder.addRenamedKeys(nextRename); + currSize += estimatedSize; + renameIndex++; + } + + // Finalize and send this batch + OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setPurgeKeysRequest(requestBuilder.build()) + .setClientId(getClientId().toString()) + .build(); + + try (BootstrapStateHandler.Lock lock = + snapTableKey != null ? getBootstrapStateLock().lock() : null) { + OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); + if (omResponse != null) { + purgeSuccess = purgeSuccess && omResponse.getSuccess(); + } + } catch (ServiceException | InterruptedException e) { + LOG.error("PurgeKey request failed in batch. Will retry at next run.", e); + purgeSuccess = false; + // Continue to next batch instead of returning immediately } - } catch (ServiceException e) { - LOG.error("PurgeKey request failed. Will retry at next run.", e); - return Pair.of(0, false); } return Pair.of(deletedCount, purgeSuccess); From dae7276da824b19b7a601de10385b93e9ae28435 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Thu, 31 Jul 2025 04:10:21 +0530 Subject: [PATCH 2/9] Addressed comments. --- .../ozone/om/service/KeyDeletingService.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 19c33ecd7171..033ab4a94dac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -228,14 +228,14 @@ private Pair submitPurgeKeysRequest( int purgeKeyIndex = 0, updateIndex = 0, renameIndex = 0; int currSize = 0; - boolean flag = false; + boolean batchCapacityReached; while (purgeKeyIndex < purgeKeys.size() || updateIndex < keysToUpdateList.size() || (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) { int remainingRatisLimit = ratisLimit; - flag = false; + batchCapacityReached = false; PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); if (snapTableKey != null) { @@ -269,7 +269,7 @@ private Pair submitPurgeKeysRequest( estimatedPurgeKeysSize = tempBuilder.build().getSerializedSize(); if (currSize + estimatedPurgeKeysSize > remainingRatisLimit) { - flag = true; + batchCapacityReached = true; break; } @@ -282,14 +282,14 @@ private Pair submitPurgeKeysRequest( OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() .setVolumeName("").setBucketName("") .addAllKeys(batchPurgeKeys).build(); - requestBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys); - currSize += requestBuilder.build().getSerializedSize(); + requestBuilder.addDeletedKeys(deletedKeys); + currSize = requestBuilder.build().getSerializedSize(); } remainingRatisLimit -= currSize; // 3.2 Add keysToUpdate - while (!flag && updateIndex < keysToUpdateList.size()) { + while (!batchCapacityReached && updateIndex < keysToUpdateList.size()) { OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); @@ -297,7 +297,7 @@ private Pair submitPurgeKeysRequest( int estimatedSize = tempBuilder.build().getSerializedSize(); if (currSize + estimatedSize > remainingRatisLimit) { - flag = true; + batchCapacityReached = true; break; } @@ -309,7 +309,8 @@ private Pair submitPurgeKeysRequest( remainingRatisLimit -= currSize; // 3.3 Add renamed keys - while (!flag && renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) { + while (!batchCapacityReached && renameEntriesToBeDeleted != null && + renameIndex < renameEntriesToBeDeleted.size()) { String nextRename = renameEntriesToBeDeleted.get(renameIndex); PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); From 00835cdb3c19277f6831a321a36492cf838014cb Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Thu, 31 Jul 2025 14:37:46 +0530 Subject: [PATCH 3/9] Added an unit test. --- .../hadoop/ozone/om/KeyManagerImpl.java | 10 - .../om/service/TestKeyDeletingService.java | 213 ++++++++++++++---- 2 files changed, 174 insertions(+), 49 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index ca6913edbbb3..b4e2e75c8306 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -775,16 +775,6 @@ public PendingKeysDeletion getPendingDeletionKeys( .addAllBlockIDs(blockIDS).build(); int keyBlockSerializedSize = keyBlocks.getProto().getSerializedSize(); serializedSize += keyBlockSerializedSize; - if (serializedSize > ratisByteLimit) { - maxReqSizeExceeded = true; - if (LOG.isDebugEnabled()) { - LOG.debug( - "Total size of cumulative keys and rename entries in the snapshotRenamedTable in a cycle " + - "crossed 90% ratis limit, serialized size of keys: {}", - serializedSize); - } - break; - } blockGroupList.add(keyBlocks); currentCount++; } else { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 134e9b5f7310..ea16fa37b1d4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -35,6 +35,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mockStatic; @@ -45,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -115,6 +117,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -889,21 +892,153 @@ void checkDeletionForPartiallyCommitKey() throws Exception { } } + @Test + @DisplayName("Verify PurgeKeysRequest is batched according to Ratis byte limit") + void testPurgeKeysRequestBatching() throws Exception { + // Define a small Ratis limit to force multiple batches for testing + // The actual byte size of protobuf messages depends on content. + // A small value like 1KB or 2KB should ensure batching for ~10-20 keys. + final int testRatisLimitBytes = 1024; // 2 KB to encourage multiple batches + + // Create a fresh configuration for this test to control the Ratis limit + OzoneConfiguration testConf = new OzoneConfiguration(); + File innerTestDir = Files.createTempDirectory("TestKDS").toFile(); + ServerUtils.setOzoneMetaDirPath(testConf, innerTestDir.toString()); + + testConf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + // Set the specific Ratis limit for this test + testConf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + testRatisLimitBytes, StorageUnit.BYTES); + testConf.setQuietMode(false); + + ScmBlockLocationTestingClient testScmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 0); + OmTestManagers testOmTestManagers = new OmTestManagers(testConf, testScmBlockTestingClient, null); + KeyManager testKeyManager = testOmTestManagers.getKeyManager(); + testKeyManager.getDeletingService().suspend(); + KeyDeletingService testKds = (KeyDeletingService) testKeyManager.getDeletingService(); + OzoneManager testOm = testOmTestManagers.getOzoneManager(); + + try (MockedStatic mockedRatisUtils = + mockStatic(OzoneManagerRatisUtils.class, CALLS_REAL_METHODS)) { + + // Capture all OMRequests submitted via Ratis + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(OzoneManagerProtocolProtos.OMRequest.class); + + // Mock submitRequest to capture requests and return success + mockedRatisUtils.when(() -> OzoneManagerRatisUtils.submitRequest( + any(OzoneManager.class), + requestCaptor.capture(), // Capture the OMRequest here + any(), + anyLong())) + .thenAnswer(invocation -> { + // Return a successful OMResponse for each captured request + return OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .build(); + }); + + final int numKeysToCreate = 50; // Create enough keys to ensure multiple batches + // Create and delete keys using the test-specific managers + createAndDeleteKeys(numKeysToCreate, 1, testOmTestManagers); + + testKds.resume(); + + // Manually trigger the KeyDeletingService to run its task immediately. + // This will initiate the purge requests to Ratis. + testKds.runPeriodicalTaskNow(); + + // Verify that submitRequest was called multiple times. + // The exact number of calls depends on the key size and testRatisLimitBytes, + // but it must be more than one to confirm batching. + mockedRatisUtils.verify(() -> OzoneManagerRatisUtils.submitRequest( + any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), any(), anyLong()), + atLeast(2)); // At least 2 calls confirms batching + + // Get all captured requests that were sent + List capturedRequests = requestCaptor.getAllValues(); + int totalPurgedKeysAcrossBatches = 0; + + // Iterate through each captured Ratis request (batch) + for (OzoneManagerProtocolProtos.OMRequest omRequest : capturedRequests) { + assertNotNull(omRequest); + assertEquals(OzoneManagerProtocolProtos.Type.PurgeKeys, omRequest.getCmdType()); + + OzoneManagerProtocolProtos.PurgeKeysRequest purgeRequest = omRequest.getPurgeKeysRequest(); + + // Assert that the serialized size of each batch is within the configured ratisLimit + // We use isLessThanOrEqualTo because the batching logic ensures it doesn't exceed the limit. + assertThat(omRequest.getSerializedSize()) + .as("Batch size " + omRequest.getSerializedSize() + " should be <= ratisLimit " + testRatisLimitBytes) + .isLessThanOrEqualTo(testRatisLimitBytes); + + // Sum up the keys purged in this particular batch request + if (purgeRequest.getDeletedKeysCount() > 0) { + totalPurgedKeysAcrossBatches += purgeRequest.getDeletedKeys(0).getKeysCount(); + } + } + + // Assert that the sum of keys across all batches equals the total number of keys initially deleted. + assertEquals(numKeysToCreate, totalPurgedKeysAcrossBatches, + "Total keys purged across all batches should match initial keys deleted."); + + } finally { + // Clean up the temporary OzoneManager and its resources + if (testOm.stop()) { + testOm.join(); + } + // Clean up the temporary directory for this test + org.apache.commons.io.FileUtils.deleteDirectory(innerTestDir); + } + } + private void createAndDeleteKeys(int keyCount, int numBlocks) throws IOException { + createAndDeleteKeysInternal(keyCount, numBlocks, null); + } + + private void createAndDeleteKeys(int keyCount, int numBlocks, OmTestManagers testManager) throws IOException { + createAndDeleteKeysInternal(keyCount, numBlocks, testManager); + } + + private void createAndDeleteKeysInternal(int keyCount, int numBlocks, + OmTestManagers testManager) throws IOException { for (int x = 0; x < keyCount; x++) { final String volumeName = getTestName(); final String bucketName = uniqueObjectName("bucket"); final String keyName = uniqueObjectName("key"); - // Create Volume and Bucket - createVolumeAndBucket(volumeName, bucketName, false); - - // Create the key - OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, - keyName, numBlocks); + if (testManager != null) { + // Create volume and bucket manually in metadata manager + OMRequestTestUtils.addVolumeToOM( + testManager.getKeyManager().getMetadataManager(), + OmVolumeArgs.newBuilder() + .setOwnerName("o") + .setAdminName("a") + .setVolume(volumeName) + .build()); + + OMRequestTestUtils.addBucketToOM( + testManager.getKeyManager().getMetadataManager(), + OmBucketInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setIsVersionEnabled(false) + .build()); + + // Create and delete key via provided writeClient + OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, + keyName, numBlocks, 0, testManager.getWriteClient()); + testManager.getWriteClient().deleteKey(keyArg); + + } else { + // Use default client-based creation + createVolumeAndBucket(volumeName, bucketName, false); - // Delete the key - writeClient.deleteKey(keyArg); + OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, + keyName, numBlocks); + writeClient.deleteKey(keyArg); + } } } @@ -991,58 +1126,58 @@ private void renameKey(String volumeName, private OmKeyArgs createAndCommitKey(String volumeName, String bucketName, String keyName, int numBlocks) throws IOException { - return createAndCommitKey(volumeName, bucketName, keyName, - numBlocks, 0); + return createAndCommitKey(volumeName, bucketName, keyName, numBlocks, 0, this.writeClient); } private OmKeyArgs createAndCommitKey(String volumeName, - String bucketName, String keyName, int numBlocks, int numUncommitted) - throws IOException { - // Even if no key size is appointed, there will be at least one - // block pre-allocated when key is created - OmKeyArgs keyArg = - new OmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setAcls(Collections.emptyList()) - .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) - .setDataSize(1000L) - .setLocationInfoList(new ArrayList<>()) - .setOwnerName("user" + RandomStringUtils.secure().nextNumeric(5)) - .build(); - //Open and Commit the Key in the Key Manager. - OpenKeySession session = writeClient.openKey(keyArg); + String bucketName, String keyName, int numBlocks, int numUncommitted) throws IOException { + return createAndCommitKey(volumeName, bucketName, keyName, numBlocks, numUncommitted, this.writeClient); + } - // add pre-allocated blocks into args and avoid creating excessive block - OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo(). - getLatestVersionLocations(); + private OmKeyArgs createAndCommitKey(String volumeName, + String bucketName, String keyName, int numBlocks, int numUncommitted, + OzoneManagerProtocol customWriteClient) throws IOException { + + OmKeyArgs keyArg = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setDataSize(1000L) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName("user" + RandomStringUtils.secure().nextNumeric(5)) + .build(); + + // Open and Commit the Key in the Key Manager. + OpenKeySession session = customWriteClient.openKey(keyArg); + + OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo() + .getLatestVersionLocations(); assert keyLocationVersions != null; - List latestBlocks = keyLocationVersions. - getBlocksLatestVersionOnly(); + + List latestBlocks = keyLocationVersions + .getBlocksLatestVersionOnly(); + int preAllocatedSize = latestBlocks.size(); for (OmKeyLocationInfo block : latestBlocks) { keyArg.addLocationInfo(block); } - // allocate blocks until the blocks num equal to numBlocks LinkedList allocated = new LinkedList<>(); for (int i = 0; i < numBlocks - preAllocatedSize; i++) { - allocated.add(writeClient.allocateBlock(keyArg, session.getId(), - new ExcludeList())); + allocated.add(customWriteClient.allocateBlock(keyArg, session.getId(), new ExcludeList())); } - // remove the blocks not to be committed for (int i = 0; i < numUncommitted; i++) { allocated.removeFirst(); } - // add the blocks to be committed - for (OmKeyLocationInfo block: allocated) { + for (OmKeyLocationInfo block : allocated) { keyArg.addLocationInfo(block); } - writeClient.commitKey(keyArg, session.getId()); + customWriteClient.commitKey(keyArg, session.getId()); return keyArg; } From 7790cbbb85d314d259195c26c5126732a4b28f65 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 19 Aug 2025 13:49:10 +0530 Subject: [PATCH 4/9] Addressed comments. --- ...mBlockLocationProtocolClientSideTranslatorPB.java | 4 +++- .../hadoop/ozone/om/service/KeyDeletingService.java | 12 +++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index c17e2b4a6f5d..544193521bc8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -256,7 +256,9 @@ public List deleteKeyBlocks( int currSize = bg.getProto().getSerializedSize(); if (currSize + serializedSize > ratisByteLimit) { allResults.addAll(submitDeleteKeyBlocks(batch)); - LOG.info("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize); + } serializedSize = 0; batch.clear(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 033ab4a94dac..752522eb8eb7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -234,7 +234,6 @@ private Pair submitPurgeKeysRequest( updateIndex < keysToUpdateList.size() || (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) { - int remainingRatisLimit = ratisLimit; batchCapacityReached = false; PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); @@ -249,7 +248,6 @@ private Pair submitPurgeKeysRequest( } currSize = requestBuilder.build().getSerializedSize(); - remainingRatisLimit -= currSize; // 3.1 Add purgeKeys List batchPurgeKeys = new ArrayList<>(); @@ -268,7 +266,7 @@ private Pair submitPurgeKeysRequest( tempBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys); estimatedPurgeKeysSize = tempBuilder.build().getSerializedSize(); - if (currSize + estimatedPurgeKeysSize > remainingRatisLimit) { + if (currSize + estimatedPurgeKeysSize > ratisLimit) { batchCapacityReached = true; break; } @@ -286,8 +284,6 @@ private Pair submitPurgeKeysRequest( currSize = requestBuilder.build().getSerializedSize(); } - remainingRatisLimit -= currSize; - // 3.2 Add keysToUpdate while (!batchCapacityReached && updateIndex < keysToUpdateList.size()) { OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); @@ -296,7 +292,7 @@ private Pair submitPurgeKeysRequest( tempBuilder.addKeysToUpdate(nextUpdate); int estimatedSize = tempBuilder.build().getSerializedSize(); - if (currSize + estimatedSize > remainingRatisLimit) { + if (currSize + estimatedSize > ratisLimit) { batchCapacityReached = true; break; } @@ -306,8 +302,6 @@ private Pair submitPurgeKeysRequest( updateIndex++; } - remainingRatisLimit -= currSize; - // 3.3 Add renamed keys while (!batchCapacityReached && renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) { @@ -317,7 +311,7 @@ private Pair submitPurgeKeysRequest( tempBuilder.addRenamedKeys(nextRename); int estimatedSize = tempBuilder.build().getSerializedSize(); - if (currSize + estimatedSize > remainingRatisLimit) { + if (currSize + estimatedSize > ratisLimit) { break; } From 3385ea38eff7730fa9b439e3854a5c1aa3ba256e Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Tue, 19 Aug 2025 19:26:10 +0530 Subject: [PATCH 5/9] Don't serialize to calculate string size. --- .../ozone/om/service/KeyDeletingService.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 752522eb8eb7..b7074969d5ec 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -25,6 +25,7 @@ import com.google.protobuf.ServiceException; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -251,26 +252,18 @@ private Pair submitPurgeKeysRequest( // 3.1 Add purgeKeys List batchPurgeKeys = new ArrayList<>(); - int estimatedPurgeKeysSize = 0; while (purgeKeyIndex < purgeKeys.size()) { String nextKey = purgeKeys.get(purgeKeyIndex); - List temp = new ArrayList<>(batchPurgeKeys); - temp.add(nextKey); + int estimatedKeySize = estimateStringEntrySize(nextKey); - OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() - .setVolumeName("").setBucketName("").addAllKeys(temp).build(); - - PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); - tempBuilder.clearDeletedKeys().addDeletedKeys(deletedKeys); - - estimatedPurgeKeysSize = tempBuilder.build().getSerializedSize(); - if (currSize + estimatedPurgeKeysSize > ratisLimit) { + if (currSize + estimatedKeySize > ratisLimit) { batchCapacityReached = true; break; } + currSize += estimatedKeySize; batchPurgeKeys.add(nextKey); purgeKeyIndex++; } @@ -343,6 +336,12 @@ private Pair submitPurgeKeysRequest( return Pair.of(deletedCount, purgeSuccess); } + // Helper: estimate protobuf serialized size of a string field + private static int estimateStringEntrySize(String key) { + int len = key.getBytes(StandardCharsets.UTF_8).length; + return 1 /* tag size */ + 1 /* length variant */ + len; /* actual string bytes */ + } + @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); From 95b67a43cbf3176eeb06401133c1cad187a39fed Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Wed, 20 Aug 2025 22:17:51 +0530 Subject: [PATCH 6/9] Addressed review comments. --- ...mBlockLocationProtocolClientSideTranslatorPB.java | 5 +++-- .../hadoop/ozone/om/service/KeyDeletingService.java | 12 ++++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 544193521bc8..1d9ed946191b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -253,7 +253,8 @@ public List deleteKeyBlocks( int serializedSize = 0; for (BlockGroup bg : keyBlocksInfoList) { - int currSize = bg.getProto().getSerializedSize(); + KeyBlocks bgProto = bg.getProto(); + int currSize = bgProto.getSerializedSize(); if (currSize + serializedSize > ratisByteLimit) { allResults.addAll(submitDeleteKeyBlocks(batch)); if (LOG.isDebugEnabled()) { @@ -262,7 +263,7 @@ public List deleteKeyBlocks( serializedSize = 0; batch.clear(); } - batch.add(bg.getProto()); + batch.add(bgProto); serializedSize += currSize; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 8063003b3ed8..2371c18e0ae3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -154,8 +154,8 @@ Pair, Boolean> processKeyDeletes(List keyBlocksL keyBlocksList.size(), Time.monotonicNow() - startTime); if (blockDeletionResults != null) { long purgeStartTime = Time.monotonicNow(); - purgeResult = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize); + purgeResult = submitPurgeKeysRequest(blockDeletionResults, keysToModify, renameEntries, snapTableKey, + expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize); int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", @@ -292,9 +292,7 @@ private Pair, Boolean> submitPurgeKeysRequest( while (!batchCapacityReached && updateIndex < keysToUpdateList.size()) { OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); - PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); - tempBuilder.addKeysToUpdate(nextUpdate); - int estimatedSize = tempBuilder.build().getSerializedSize(); + int estimatedSize = nextUpdate.getSerializedSize(); if (currSize + estimatedSize > ratisLimit) { batchCapacityReached = true; @@ -311,9 +309,7 @@ private Pair, Boolean> submitPurgeKeysRequest( renameIndex < renameEntriesToBeDeleted.size()) { String nextRename = renameEntriesToBeDeleted.get(renameIndex); - PurgeKeysRequest.Builder tempBuilder = requestBuilder.clone(); - tempBuilder.addRenamedKeys(nextRename); - int estimatedSize = tempBuilder.build().getSerializedSize(); + int estimatedSize = estimateStringEntrySize(nextRename); if (currSize + estimatedSize > ratisLimit) { break; From 1e76cf3a8479eb5b3fe77677785ace533c74f21b Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Sun, 24 Aug 2025 10:21:17 +0530 Subject: [PATCH 7/9] Addressed comments. --- .../hadoop/ozone/util/ProtobufUtils.java | 9 + .../ozone/om/service/KeyDeletingService.java | 171 +++++++++--------- .../om/service/TestKeyDeletingService.java | 8 +- 3 files changed, 102 insertions(+), 86 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java index 7135b1917b95..10c3669a94e5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.util; +import com.google.protobuf.CodedOutputStream; import java.util.UUID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -37,4 +38,12 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) { public static UUID fromProtobuf(HddsProtos.UUID proto) { return new UUID(proto.getMostSigBits(), proto.getLeastSigBits()); } + + /** + * Computes the serialized size of a string in a repeated string field. + * Wraps protobuf's computeStringSizeNoTag for safer use. + */ + public static int computeRepeatedStringSize(String value) { + return CodedOutputStream.computeStringSizeNoTag(value); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 2371c18e0ae3..4458052065c4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -25,7 +25,6 @@ import com.google.protobuf.ServiceException; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -51,7 +50,6 @@ import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -70,6 +68,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest; +import org.apache.hadoop.ozone.util.ProtobufUtils; import org.apache.hadoop.util.Time; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; @@ -118,7 +117,7 @@ public KeyDeletingService(OzoneManager ozoneManager, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); // always go to 90% of max limit for request as other header will be added - this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR); + this.ratisByteLimit = (int) Math.max(limit * RATIS_LIMIT_FACTOR, 1); } /** @@ -238,111 +237,123 @@ private Pair, Boolean> submitPurgeKeysRequest( } } + if (purgeKeys.isEmpty() && keysToUpdateList.isEmpty() && + (renameEntriesToBeDeleted == null || renameEntriesToBeDeleted.isEmpty())) { + return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess); + } + int purgeKeyIndex = 0, updateIndex = 0, renameIndex = 0; - int currSize = 0; - boolean batchCapacityReached; + PurgeKeysRequest.Builder requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); + int currSize = requestBuilder.build().getSerializedSize(); + int baseSize = currSize; while (purgeKeyIndex < purgeKeys.size() || updateIndex < keysToUpdateList.size() || (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) { - batchCapacityReached = false; - PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); - - if (snapTableKey != null) { - requestBuilder.setSnapshotTableKey(snapTableKey); - } - if (expectedPreviousSnapshotId != null) { - requestBuilder.setExpectedPreviousSnapshotID( - NullableUUID.newBuilder() - .setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)) - .build()); - } - - currSize = requestBuilder.build().getSerializedSize(); - - // 3.1 Add purgeKeys - List batchPurgeKeys = new ArrayList<>(); - - while (purgeKeyIndex < purgeKeys.size()) { + // 3.1 Purge keys (one at a time) + if (purgeKeyIndex < purgeKeys.size()) { String nextKey = purgeKeys.get(purgeKeyIndex); + int estimatedKeySize = ProtobufUtils.computeRepeatedStringSize(nextKey); - int estimatedKeySize = estimateStringEntrySize(nextKey); - - if (currSize + estimatedKeySize > ratisLimit) { - batchCapacityReached = true; - break; + if (currSize + estimatedKeySize <= ratisLimit) { + requestBuilder.addDeletedKeys( + OzoneManagerProtocolProtos.DeletedKeys.newBuilder() + .setVolumeName("").setBucketName("") + .addKeys(nextKey) + .build()); + currSize += estimatedKeySize; + purgeKeyIndex++; + } else { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); + currSize = baseSize; + continue; } - - currSize += estimatedKeySize; - batchPurgeKeys.add(nextKey); - purgeKeyIndex++; - } - - // Now actually add batchPurgeKeys - if (!batchPurgeKeys.isEmpty()) { - OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() - .setVolumeName("").setBucketName("") - .addAllKeys(batchPurgeKeys).build(); - requestBuilder.addDeletedKeys(deletedKeys); - currSize = requestBuilder.build().getSerializedSize(); } // 3.2 Add keysToUpdate - while (!batchCapacityReached && updateIndex < keysToUpdateList.size()) { - OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); + if (updateIndex < keysToUpdateList.size()) { + OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = + keysToUpdateList.get(updateIndex); int estimatedSize = nextUpdate.getSerializedSize(); - if (currSize + estimatedSize > ratisLimit) { - batchCapacityReached = true; - break; + if (currSize + estimatedSize <= ratisLimit) { + requestBuilder.addKeysToUpdate(nextUpdate); + currSize += estimatedSize; + updateIndex++; + } else { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); + currSize = baseSize; + continue; } - - requestBuilder.addKeysToUpdate(nextUpdate); - currSize += estimatedSize; - updateIndex++; } // 3.3 Add renamed keys - while (!batchCapacityReached && renameEntriesToBeDeleted != null && + if (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) { String nextRename = renameEntriesToBeDeleted.get(renameIndex); - int estimatedSize = estimateStringEntrySize(nextRename); + int estimatedSize = ProtobufUtils.computeRepeatedStringSize(nextRename); - if (currSize + estimatedSize > ratisLimit) { - break; + if (currSize + estimatedSize <= ratisLimit) { + currSize += estimatedSize; + requestBuilder.addRenamedKeys(nextRename); + renameIndex++; + } else { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); + currSize = baseSize; } - - requestBuilder.addRenamedKeys(nextRename); - currSize += estimatedSize; - renameIndex++; } - // Finalize and send this batch - OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) - .setPurgeKeysRequest(requestBuilder.build()) - .setClientId(getClientId().toString()) - .build(); - - try (BootstrapStateHandler.Lock lock = - snapTableKey != null ? getBootstrapStateLock().lock() : null) { - OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); - if (omResponse != null) { - purgeSuccess = purgeSuccess && omResponse.getSuccess(); - } - } catch (ServiceException | InterruptedException e) { - LOG.error("PurgeKey request failed in batch. Will retry at next run.", e); - purgeSuccess = false; - // Continue to next batch instead of returning immediately - } + } + // Finalize and send this batch + if (currSize > baseSize) { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); } return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess); } + private static PurgeKeysRequest.Builder setSnapTableKeyAndPrevSnapId(String snapTableKey, + UUID expectedPreviousSnapshotId) { + PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); + + if (snapTableKey != null) { + requestBuilder.setSnapshotTableKey(snapTableKey); + } + + NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder(); + if (expectedPreviousSnapshotId != null) { + expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); + } + requestBuilder.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); + return requestBuilder; + } + + private boolean submitPurgeRequest(String snapTableKey, boolean purgeSuccess, + PurgeKeysRequest.Builder requestBuilder) { + + OzoneManagerProtocolProtos.OMRequest omRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setPurgeKeysRequest(requestBuilder.build()).setClientId(getClientId().toString()).build(); + + try (Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { + OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); + if (omResponse != null) { + purgeSuccess = purgeSuccess && omResponse.getSuccess(); + } + } catch (ServiceException | InterruptedException e) { + LOG.error("PurgeKey request failed in batch. Will retry at next run.", e); + purgeSuccess = false; + // Continue to next batch instead of returning immediately + } + return purgeSuccess; + } + /** * Updates ServiceMetrics for the last run of the service. */ @@ -370,12 +381,6 @@ private void resetMetrics() { getMetrics().setKdsCurRunTimestamp(latestRunTimestamp); } - // Helper: estimate protobuf serialized size of a string field - private static int estimateStringEntrySize(String key) { - int len = key.getBytes(StandardCharsets.UTF_8).length; - return 1 /* tag size */ + 1 /* length variant */ + len; /* actual string bytes */ - } - @Override public BackgroundTaskQueue getTasks() { resetMetrics(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 3f4588e5b37e..7319697d00f2 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -1124,10 +1124,12 @@ void testPurgeKeysRequestBatching() throws Exception { .as("Batch size " + omRequest.getSerializedSize() + " should be <= ratisLimit " + testRatisLimitBytes) .isLessThanOrEqualTo(testRatisLimitBytes); - // Sum up the keys purged in this particular batch request - if (purgeRequest.getDeletedKeysCount() > 0) { - totalPurgedKeysAcrossBatches += purgeRequest.getDeletedKeys(0).getKeysCount(); + // Sum up all the keys purged in this batch (may be spread across multiple DeletedKeys entries) + int batchKeyCount = 0; + for (OzoneManagerProtocolProtos.DeletedKeys deletedKeys : purgeRequest.getDeletedKeysList()) { + batchKeyCount += deletedKeys.getKeysCount(); } + totalPurgedKeysAcrossBatches += batchKeyCount; } // Assert that the sum of keys across all batches equals the total number of keys initially deleted. From 4cc9c07efeb032d759713c5a9920e8b4a3f35a64 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Sun, 24 Aug 2025 12:06:41 +0530 Subject: [PATCH 8/9] Improved a condition. --- .../apache/hadoop/ozone/om/service/KeyDeletingService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 4458052065c4..adb39b27cd63 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -299,8 +299,8 @@ private Pair, Boolean> submitPurgeKeysRequest( int estimatedSize = ProtobufUtils.computeRepeatedStringSize(nextRename); if (currSize + estimatedSize <= ratisLimit) { - currSize += estimatedSize; requestBuilder.addRenamedKeys(nextRename); + currSize += estimatedSize; renameIndex++; } else { purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); @@ -311,7 +311,9 @@ private Pair, Boolean> submitPurgeKeysRequest( } // Finalize and send this batch - if (currSize > baseSize) { + if (requestBuilder.getDeletedKeysCount() > 0 + || requestBuilder.getKeysToUpdateCount() > 0 + || requestBuilder.getRenamedKeysCount() > 0) { purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); } From 186f984e39b8b026994c3dcb1bf54ca164f06bd4 Mon Sep 17 00:00:00 2001 From: Aryan Gupta Date: Mon, 25 Aug 2025 00:34:57 +0530 Subject: [PATCH 9/9] Addressed review comments. --- .../ozone/om/service/KeyDeletingService.java | 89 ++++++++----------- .../om/service/TestKeyDeletingService.java | 21 ++--- 2 files changed, 47 insertions(+), 63 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index adb39b27cd63..dc5ce21d2773 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -116,7 +116,8 @@ public KeyDeletingService(OzoneManager ozoneManager, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); - // always go to 90% of max limit for request as other header will be added + // Use 90% of the actual Ratis limit to account for protobuf overhead and + // prevent accidentally exceeding the hard limit during request serialization. this.ratisByteLimit = (int) Math.max(limit * RATIS_LIMIT_FACTOR, 1); } @@ -243,12 +244,11 @@ private Pair, Boolean> submitPurgeKeysRequest( } int purgeKeyIndex = 0, updateIndex = 0, renameIndex = 0; - PurgeKeysRequest.Builder requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); + PurgeKeysRequest.Builder requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId); int currSize = requestBuilder.build().getSerializedSize(); int baseSize = currSize; - while (purgeKeyIndex < purgeKeys.size() || - updateIndex < keysToUpdateList.size() || + while (purgeKeyIndex < purgeKeys.size() || updateIndex < keysToUpdateList.size() || (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size())) { // 3.1 Purge keys (one at a time) @@ -256,71 +256,54 @@ private Pair, Boolean> submitPurgeKeysRequest( String nextKey = purgeKeys.get(purgeKeyIndex); int estimatedKeySize = ProtobufUtils.computeRepeatedStringSize(nextKey); - if (currSize + estimatedKeySize <= ratisLimit) { - requestBuilder.addDeletedKeys( - OzoneManagerProtocolProtos.DeletedKeys.newBuilder() - .setVolumeName("").setBucketName("") - .addKeys(nextKey) - .build()); - currSize += estimatedKeySize; - purgeKeyIndex++; - } else { - purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); - requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); - currSize = baseSize; - continue; - } - } + requestBuilder.addDeletedKeys( + OzoneManagerProtocolProtos.DeletedKeys.newBuilder().setVolumeName("").setBucketName("").addKeys(nextKey) + .build()); + currSize += estimatedKeySize; + purgeKeyIndex++; - // 3.2 Add keysToUpdate - if (updateIndex < keysToUpdateList.size()) { - OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = - keysToUpdateList.get(updateIndex); + } else if (updateIndex < keysToUpdateList.size()) { + // 3.2 Add keysToUpdate + OzoneManagerProtocolProtos.SnapshotMoveKeyInfos nextUpdate = keysToUpdateList.get(updateIndex); int estimatedSize = nextUpdate.getSerializedSize(); - if (currSize + estimatedSize <= ratisLimit) { - requestBuilder.addKeysToUpdate(nextUpdate); - currSize += estimatedSize; - updateIndex++; - } else { - purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); - requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); - currSize = baseSize; - continue; - } - } + requestBuilder.addKeysToUpdate(nextUpdate); + currSize += estimatedSize; + updateIndex++; - // 3.3 Add renamed keys - if (renameEntriesToBeDeleted != null && - renameIndex < renameEntriesToBeDeleted.size()) { + } else if (renameEntriesToBeDeleted != null && renameIndex < renameEntriesToBeDeleted.size()) { + // 3.3 Add renamed keys String nextRename = renameEntriesToBeDeleted.get(renameIndex); int estimatedSize = ProtobufUtils.computeRepeatedStringSize(nextRename); - if (currSize + estimatedSize <= ratisLimit) { - requestBuilder.addRenamedKeys(nextRename); - currSize += estimatedSize; - renameIndex++; - } else { - purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); - requestBuilder = setSnapTableKeyAndPrevSnapId(snapTableKey, expectedPreviousSnapshotId); - currSize = baseSize; - } + requestBuilder.addRenamedKeys(nextRename); + currSize += estimatedSize; + renameIndex++; } - } - // Finalize and send this batch - if (requestBuilder.getDeletedKeysCount() > 0 - || requestBuilder.getKeysToUpdateCount() > 0 - || requestBuilder.getRenamedKeysCount() > 0) { - purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + // Flush either when limit is hit, or at the very end if items remain + boolean allDone = purgeKeyIndex == purgeKeys.size() && updateIndex == keysToUpdateList.size() && + (renameEntriesToBeDeleted == null || renameIndex == renameEntriesToBeDeleted.size()); + + if (currSize >= ratisLimit || (allDone && hasPendingItems(requestBuilder))) { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId); + currSize = baseSize; + } } return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess); } - private static PurgeKeysRequest.Builder setSnapTableKeyAndPrevSnapId(String snapTableKey, + private boolean hasPendingItems(PurgeKeysRequest.Builder builder) { + return builder.getDeletedKeysCount() > 0 + || builder.getKeysToUpdateCount() > 0 + || builder.getRenamedKeysCount() > 0; + } + + private static PurgeKeysRequest.Builder getPurgeKeysRequest(String snapTableKey, UUID expectedPreviousSnapshotId) { PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 7319697d00f2..79bd04171d9f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -1049,7 +1049,8 @@ void testPurgeKeysRequestBatching() throws Exception { // Define a small Ratis limit to force multiple batches for testing // The actual byte size of protobuf messages depends on content. // A small value like 1KB or 2KB should ensure batching for ~10-20 keys. - final int testRatisLimitBytes = 1024; // 2 KB to encourage multiple batches + final int actualRatisLimitBytes = 1138; + final int testRatisLimitBytes = 1024; // 2 KB to encourage multiple batches, 90% of the actualRatisLimitBytes. // Create a fresh configuration for this test to control the Ratis limit OzoneConfiguration testConf = new OzoneConfiguration(); @@ -1118,18 +1119,18 @@ void testPurgeKeysRequestBatching() throws Exception { OzoneManagerProtocolProtos.PurgeKeysRequest purgeRequest = omRequest.getPurgeKeysRequest(); - // Assert that the serialized size of each batch is within the configured ratisLimit - // We use isLessThanOrEqualTo because the batching logic ensures it doesn't exceed the limit. + // At runtime we enforce ~90% of the Ratis limit as a safety margin, + // but in tests we assert against the actual limit to avoid false negatives. + // This ensures no batch ever exceeds the true Ratis size limit. assertThat(omRequest.getSerializedSize()) - .as("Batch size " + omRequest.getSerializedSize() + " should be <= ratisLimit " + testRatisLimitBytes) - .isLessThanOrEqualTo(testRatisLimitBytes); + .as("Batch size " + omRequest.getSerializedSize() + " should be <= ratisLimit " + actualRatisLimitBytes) + .isLessThanOrEqualTo(actualRatisLimitBytes); // Sum up all the keys purged in this batch (may be spread across multiple DeletedKeys entries) - int batchKeyCount = 0; - for (OzoneManagerProtocolProtos.DeletedKeys deletedKeys : purgeRequest.getDeletedKeysList()) { - batchKeyCount += deletedKeys.getKeysCount(); - } - totalPurgedKeysAcrossBatches += batchKeyCount; + totalPurgedKeysAcrossBatches += purgeRequest.getDeletedKeysList() + .stream() + .mapToInt(OzoneManagerProtocolProtos.DeletedKeys::getKeysCount) + .sum(); } // Assert that the sum of keys across all batches equals the total number of keys initially deleted.