diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java index 7135b1917b95..10c3669a94e5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ProtobufUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.util; +import com.google.protobuf.CodedOutputStream; import java.util.UUID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -37,4 +38,12 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) { public static UUID fromProtobuf(HddsProtos.UUID proto) { return new UUID(proto.getMostSigBits(), proto.getLeastSigBits()); } + + /** + * Computes the serialized size of a string in a repeated string field. + * Wraps protobuf's computeStringSizeNoTag for safer use. + */ + public static int computeRepeatedStringSize(String value) { + return CodedOutputStream.computeStringSizeNoTag(value); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index a7324d481c48..1d9ed946191b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; @@ -51,6 +53,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type; import org.apache.hadoop.hdds.scm.AddSCMRequest; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -68,6 +71,8 @@ import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is the client-side translator to translate the requests made on @@ -78,6 +83,12 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable { + private static final Logger LOG = + LoggerFactory.getLogger(ScmBlockLocationProtocolClientSideTranslatorPB.class); + + private static final double RATIS_LIMIT_FACTOR = 0.9; + private int ratisByteLimit; + /** * RpcController is not used and hence is set to null. */ @@ -93,12 +104,18 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB * failover proxy provider. */ public ScmBlockLocationProtocolClientSideTranslatorPB( - SCMBlockLocationFailoverProxyProvider proxyProvider) { + SCMBlockLocationFailoverProxyProvider proxyProvider, OzoneConfiguration conf) { Preconditions.checkState(proxyProvider != null); this.failoverProxyProvider = proxyProvider; this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create( ScmBlockLocationProtocolPB.class, failoverProxyProvider, failoverProxyProvider.getRetryPolicy()); + int limit = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT, + ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // always go to 90% of max limit for request as other header will be added + this.ratisByteLimit = (int) (limit * RATIS_LIMIT_FACTOR); } /** @@ -230,18 +247,43 @@ public List allocateBlock( @Override public List deleteKeyBlocks( List keyBlocksInfoList) throws IOException { - List keyBlocksProto = keyBlocksInfoList.stream() - .map(BlockGroup::getProto).collect(Collectors.toList()); + + List allResults = new ArrayList<>(); + List batch = new ArrayList<>(); + + int serializedSize = 0; + for (BlockGroup bg : keyBlocksInfoList) { + KeyBlocks bgProto = bg.getProto(); + int currSize = bgProto.getSerializedSize(); + if (currSize + serializedSize > ratisByteLimit) { + allResults.addAll(submitDeleteKeyBlocks(batch)); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending batch of {} KeyBlocks (~{} bytes)", batch.size(), serializedSize); + } + serializedSize = 0; + batch.clear(); + } + batch.add(bgProto); + serializedSize += currSize; + } + + if (!batch.isEmpty()) { + allResults.addAll(submitDeleteKeyBlocks(batch)); + } + + return allResults; + } + + private List submitDeleteKeyBlocks(List batch) + throws IOException { DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto .newBuilder() - .addAllKeyBlocks(keyBlocksProto) + .addAllKeyBlocks(batch) .build(); - SCMBlockLocationRequest wrapper = createSCMBlockRequest( Type.DeleteScmKeyBlocks) .setDeleteScmKeyBlocksRequest(request) .build(); - final SCMBlockLocationResponse wrappedResponse = handleError(submitRequest(wrapper)); final DeleteScmKeyBlocksResponseProto resp = diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java index 81df3a4cefb3..406736f53107 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java @@ -131,7 +131,7 @@ public static ScmBlockLocationProtocol getScmBlockClient( OzoneConfiguration conf) { ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - new SCMBlockLocationFailoverProxyProvider(conf)); + new SCMBlockLocationFailoverProxyProvider(conf), conf); return TracingUtil .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class, conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java index 41d7d2690a71..68617011d71c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestFailoverWithSCMHA.java @@ -112,7 +112,7 @@ public void testFailover() throws Exception { failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - failoverProxyProvider); + failoverProxyProvider, conf); GenericTestUtils .setLogLevel(SCMBlockLocationFailoverProxyProvider.class, Level.DEBUG); LogCapturer logCapture = LogCapturer.captureLogs(SCMBlockLocationFailoverProxyProvider.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java index 7cc308feee7a..8a88a72b9909 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestGetClusterTreeInformation.java @@ -59,7 +59,7 @@ public void testGetClusterTreeInformation() throws IOException { failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId()); ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - failoverProxyProvider); + failoverProxyProvider, conf); InnerNode expectedInnerNode = (InnerNode) scm.getClusterMap().getNode(ROOT); InnerNode actualInnerNode = scmBlockLocationClient.getNetworkTopology(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index bf429ad01dd0..dc5ce21d2773 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -49,7 +50,6 @@ import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest; +import org.apache.hadoop.ozone.util.ProtobufUtils; import org.apache.hadoop.util.Time; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; @@ -88,6 +89,9 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final AtomicLong deletedKeyCount; private final boolean deepCleanSnapshots; private final SnapshotChainManager snapshotChainManager; + private int ratisByteLimit; + private static final double RATIS_LIMIT_FACTOR = 0.9; + // Track metrics for current task execution private long latestRunTimestamp = 0L; private final DeletionStats aosDeletionStats = new DeletionStats(); @@ -108,6 +112,13 @@ public KeyDeletingService(OzoneManager ozoneManager, this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.scmClient = scmClient; + int limit = (int) ozoneManager.getConfiguration().getStorageSize( + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + // Use 90% of the actual Ratis limit to account for protobuf overhead and + // prevent accidentally exceeding the hard limit during request serialization. + this.ratisByteLimit = (int) Math.max(limit * RATIS_LIMIT_FACTOR, 1); } /** @@ -143,8 +154,8 @@ Pair, Boolean> processKeyDeletes(List keyBlocksL keyBlocksList.size(), Time.monotonicNow() - startTime); if (blockDeletionResults != null) { long purgeStartTime = Time.monotonicNow(); - purgeResult = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId, keyBlockReplicatedSize); + purgeResult = submitPurgeKeysRequest(blockDeletionResults, keysToModify, renameEntries, snapTableKey, + expectedPreviousSnapshotId, ratisByteLimit, keyBlockReplicatedSize); int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", @@ -160,10 +171,16 @@ Pair, Boolean> processKeyDeletes(List keyBlocksL * @param results DeleteBlockGroups returned by SCM. * @param keysToModify Updated list of RepeatedOmKeyInfo */ - private Pair, Boolean> submitPurgeKeysRequest(List results, - Map keysToModify, List renameEntriesToBeDeleted, - String snapTableKey, UUID expectedPreviousSnapshotId, Map keyBlockReplicatedSize) - throws InterruptedException { + @SuppressWarnings("checkstyle:MethodLength") + private Pair, Boolean> submitPurgeKeysRequest( + List results, + Map keysToModify, + List renameEntriesToBeDeleted, + String snapTableKey, + UUID expectedPreviousSnapshotId, + int ratisLimit, + Map keyBlockReplicatedSize) throws InterruptedException { + List purgeKeys = new ArrayList<>(); // Put all keys to be purged in a list @@ -171,6 +188,8 @@ private Pair, Boolean> submitPurgeKeysRequest(List failedDeletedKeys = new HashSet<>(); boolean purgeSuccess = true; + + // Step 1: Process DeleteBlockGroupResults for (DeleteBlockGroupResult result : results) { String deletedKey = result.getObjectKey(); if (result.isSuccess()) { @@ -199,25 +218,7 @@ private Pair, Boolean> submitPurgeKeysRequest(List keysToUpdateList = new ArrayList<>(); if (keysToModify != null) { for (Map.Entry keyToModify : @@ -235,30 +236,107 @@ private Pair, Boolean> submitPurgeKeysRequest(List= ratisLimit || (allDone && hasPendingItems(requestBuilder))) { + purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId); + currSize = baseSize; } } - OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) - .setPurgeKeysRequest(purgeKeysRequest) - .setClientId(getClientId().toString()) - .build(); + return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess); + } + + private boolean hasPendingItems(PurgeKeysRequest.Builder builder) { + return builder.getDeletedKeysCount() > 0 + || builder.getKeysToUpdateCount() > 0 + || builder.getRenamedKeysCount() > 0; + } + + private static PurgeKeysRequest.Builder getPurgeKeysRequest(String snapTableKey, + UUID expectedPreviousSnapshotId) { + PurgeKeysRequest.Builder requestBuilder = PurgeKeysRequest.newBuilder(); + + if (snapTableKey != null) { + requestBuilder.setSnapshotTableKey(snapTableKey); + } + + NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder(); + if (expectedPreviousSnapshotId != null) { + expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); + } + requestBuilder.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); + return requestBuilder; + } + + private boolean submitPurgeRequest(String snapTableKey, boolean purgeSuccess, + PurgeKeysRequest.Builder requestBuilder) { + + OzoneManagerProtocolProtos.OMRequest omRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setPurgeKeysRequest(requestBuilder.build()).setClientId(getClientId().toString()).build(); - // Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots. - try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { + try (Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); if (omResponse != null) { purgeSuccess = purgeSuccess && omResponse.getSuccess(); } - } catch (ServiceException e) { - LOG.error("PurgeKey request failed. Will retry at next run.", e); - return Pair.of(Pair.of(0, 0L), false); + } catch (ServiceException | InterruptedException e) { + LOG.error("PurgeKey request failed in batch. Will retry at next run.", e); + purgeSuccess = false; + // Continue to next batch instead of returning immediately } - - return Pair.of(Pair.of(deletedCount, deletedReplSize), purgeSuccess); + return purgeSuccess; } /** diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 505b5d4845c4..79bd04171d9f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -35,6 +35,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mockStatic; @@ -45,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -74,6 +77,7 @@ import org.apache.hadoop.ozone.om.DeletingServiceMetrics; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.KeyManagerImpl; +import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -114,6 +118,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -1038,21 +1043,156 @@ void testLastRunAnd24hMetrics() throws Exception { } } + @Test + @DisplayName("Verify PurgeKeysRequest is batched according to Ratis byte limit") + void testPurgeKeysRequestBatching() throws Exception { + // Define a small Ratis limit to force multiple batches for testing + // The actual byte size of protobuf messages depends on content. + // A small value like 1KB or 2KB should ensure batching for ~10-20 keys. + final int actualRatisLimitBytes = 1138; + final int testRatisLimitBytes = 1024; // 2 KB to encourage multiple batches, 90% of the actualRatisLimitBytes. + + // Create a fresh configuration for this test to control the Ratis limit + OzoneConfiguration testConf = new OzoneConfiguration(); + File innerTestDir = Files.createTempDirectory("TestKDS").toFile(); + ServerUtils.setOzoneMetaDirPath(testConf, innerTestDir.toString()); + + testConf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + // Set the specific Ratis limit for this test + testConf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + testRatisLimitBytes, StorageUnit.BYTES); + testConf.setQuietMode(false); + + ScmBlockLocationTestingClient testScmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 0); + OmTestManagers testOmTestManagers = new OmTestManagers(testConf, testScmBlockTestingClient, null); + KeyManager testKeyManager = testOmTestManagers.getKeyManager(); + testKeyManager.getDeletingService().suspend(); + KeyDeletingService testKds = (KeyDeletingService) testKeyManager.getDeletingService(); + OzoneManager testOm = testOmTestManagers.getOzoneManager(); + + try (MockedStatic mockedRatisUtils = + mockStatic(OzoneManagerRatisUtils.class, CALLS_REAL_METHODS)) { + + // Capture all OMRequests submitted via Ratis + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(OzoneManagerProtocolProtos.OMRequest.class); + + // Mock submitRequest to capture requests and return success + mockedRatisUtils.when(() -> OzoneManagerRatisUtils.submitRequest( + any(OzoneManager.class), + requestCaptor.capture(), // Capture the OMRequest here + any(), + anyLong())) + .thenAnswer(invocation -> { + // Return a successful OMResponse for each captured request + return OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .build(); + }); + + final int numKeysToCreate = 50; // Create enough keys to ensure multiple batches + // Create and delete keys using the test-specific managers + createAndDeleteKeys(numKeysToCreate, 1, testOmTestManagers); + + testKds.resume(); + + // Manually trigger the KeyDeletingService to run its task immediately. + // This will initiate the purge requests to Ratis. + testKds.runPeriodicalTaskNow(); + + // Verify that submitRequest was called multiple times. + // The exact number of calls depends on the key size and testRatisLimitBytes, + // but it must be more than one to confirm batching. + mockedRatisUtils.verify(() -> OzoneManagerRatisUtils.submitRequest( + any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), any(), anyLong()), + atLeast(2)); // At least 2 calls confirms batching + + // Get all captured requests that were sent + List capturedRequests = requestCaptor.getAllValues(); + int totalPurgedKeysAcrossBatches = 0; + + // Iterate through each captured Ratis request (batch) + for (OzoneManagerProtocolProtos.OMRequest omRequest : capturedRequests) { + assertNotNull(omRequest); + assertEquals(OzoneManagerProtocolProtos.Type.PurgeKeys, omRequest.getCmdType()); + + OzoneManagerProtocolProtos.PurgeKeysRequest purgeRequest = omRequest.getPurgeKeysRequest(); + + // At runtime we enforce ~90% of the Ratis limit as a safety margin, + // but in tests we assert against the actual limit to avoid false negatives. + // This ensures no batch ever exceeds the true Ratis size limit. + assertThat(omRequest.getSerializedSize()) + .as("Batch size " + omRequest.getSerializedSize() + " should be <= ratisLimit " + actualRatisLimitBytes) + .isLessThanOrEqualTo(actualRatisLimitBytes); + + // Sum up all the keys purged in this batch (may be spread across multiple DeletedKeys entries) + totalPurgedKeysAcrossBatches += purgeRequest.getDeletedKeysList() + .stream() + .mapToInt(OzoneManagerProtocolProtos.DeletedKeys::getKeysCount) + .sum(); + } + + // Assert that the sum of keys across all batches equals the total number of keys initially deleted. + assertEquals(numKeysToCreate, totalPurgedKeysAcrossBatches, + "Total keys purged across all batches should match initial keys deleted."); + + } finally { + // Clean up the temporary OzoneManager and its resources + if (testOm.stop()) { + testOm.join(); + } + // Clean up the temporary directory for this test + org.apache.commons.io.FileUtils.deleteDirectory(innerTestDir); + } + } + private void createAndDeleteKeys(int keyCount, int numBlocks) throws IOException { + createAndDeleteKeysInternal(keyCount, numBlocks, null); + } + + private void createAndDeleteKeys(int keyCount, int numBlocks, OmTestManagers testManager) throws IOException { + createAndDeleteKeysInternal(keyCount, numBlocks, testManager); + } + + private void createAndDeleteKeysInternal(int keyCount, int numBlocks, + OmTestManagers testManager) throws IOException { for (int x = 0; x < keyCount; x++) { final String volumeName = getTestName(); final String bucketName = uniqueObjectName("bucket"); final String keyName = uniqueObjectName("key"); - // Create Volume and Bucket - createVolumeAndBucket(volumeName, bucketName, false); - - // Create the key - OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, - keyName, numBlocks); + if (testManager != null) { + // Create volume and bucket manually in metadata manager + OMRequestTestUtils.addVolumeToOM( + testManager.getKeyManager().getMetadataManager(), + OmVolumeArgs.newBuilder() + .setOwnerName("o") + .setAdminName("a") + .setVolume(volumeName) + .build()); + + OMRequestTestUtils.addBucketToOM( + testManager.getKeyManager().getMetadataManager(), + OmBucketInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setIsVersionEnabled(false) + .build()); + + // Create and delete key via provided writeClient + OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, + keyName, numBlocks, 0, testManager.getWriteClient()); + testManager.getWriteClient().deleteKey(keyArg); + + } else { + // Use default client-based creation + createVolumeAndBucket(volumeName, bucketName, false); - // Delete the key - writeClient.deleteKey(keyArg); + OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, + keyName, numBlocks); + writeClient.deleteKey(keyArg); + } } } @@ -1140,58 +1280,58 @@ private void renameKey(String volumeName, private OmKeyArgs createAndCommitKey(String volumeName, String bucketName, String keyName, int numBlocks) throws IOException { - return createAndCommitKey(volumeName, bucketName, keyName, - numBlocks, 0); + return createAndCommitKey(volumeName, bucketName, keyName, numBlocks, 0, this.writeClient); } private OmKeyArgs createAndCommitKey(String volumeName, - String bucketName, String keyName, int numBlocks, int numUncommitted) - throws IOException { - // Even if no key size is appointed, there will be at least one - // block pre-allocated when key is created - OmKeyArgs keyArg = - new OmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setAcls(Collections.emptyList()) - .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) - .setDataSize(DATA_SIZE) - .setLocationInfoList(new ArrayList<>()) - .setOwnerName("user" + RandomStringUtils.secure().nextNumeric(5)) - .build(); - //Open and Commit the Key in the Key Manager. - OpenKeySession session = writeClient.openKey(keyArg); + String bucketName, String keyName, int numBlocks, int numUncommitted) throws IOException { + return createAndCommitKey(volumeName, bucketName, keyName, numBlocks, numUncommitted, this.writeClient); + } - // add pre-allocated blocks into args and avoid creating excessive block - OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo(). - getLatestVersionLocations(); + private OmKeyArgs createAndCommitKey(String volumeName, + String bucketName, String keyName, int numBlocks, int numUncommitted, + OzoneManagerProtocol customWriteClient) throws IOException { + + OmKeyArgs keyArg = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setAcls(Collections.emptyList()) + .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) + .setDataSize(1000L) + .setLocationInfoList(new ArrayList<>()) + .setOwnerName("user" + RandomStringUtils.secure().nextNumeric(5)) + .build(); + + // Open and Commit the Key in the Key Manager. + OpenKeySession session = customWriteClient.openKey(keyArg); + + OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo() + .getLatestVersionLocations(); assert keyLocationVersions != null; - List latestBlocks = keyLocationVersions. - getBlocksLatestVersionOnly(); + + List latestBlocks = keyLocationVersions + .getBlocksLatestVersionOnly(); + int preAllocatedSize = latestBlocks.size(); for (OmKeyLocationInfo block : latestBlocks) { keyArg.addLocationInfo(block); } - // allocate blocks until the blocks num equal to numBlocks LinkedList allocated = new LinkedList<>(); for (int i = 0; i < numBlocks - preAllocatedSize; i++) { - allocated.add(writeClient.allocateBlock(keyArg, session.getId(), - new ExcludeList())); + allocated.add(customWriteClient.allocateBlock(keyArg, session.getId(), new ExcludeList())); } - // remove the blocks not to be committed for (int i = 0; i < numUncommitted; i++) { allocated.removeFirst(); } - // add the blocks to be committed - for (OmKeyLocationInfo block: allocated) { + for (OmKeyLocationInfo block : allocated) { keyArg.addLocationInfo(block); } - writeClient.commitKey(keyArg, session.getId()); + customWriteClient.commitKey(keyArg, session.getId()); return keyArg; }