From 1a179f10b66ef1836cb4343e410d61008d295704 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 24 Mar 2022 14:13:43 +0800 Subject: [PATCH 01/11] Remove open key expire settings in test --- .../apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java | 4 ---- .../org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java | 4 ---- .../src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java | 5 ----- .../org/apache/hadoop/ozone/om/TestSecureOzoneManager.java | 4 ---- 4 files changed, 17 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java index ca8e4799eec1..b2576fefa4ba 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.util.Collections; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; @@ -40,7 +39,6 @@ import org.apache.commons.io.FileUtils; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; import org.junit.After; import org.junit.Assert; @@ -86,8 +84,6 @@ public void init() throws Exception { scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); - conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, - 2, TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(conf) .setClusterId(clusterId) .setScmId(scmId) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java index 2aaf78e13a03..65c57c5717c8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java @@ -31,13 +31,11 @@ import java.io.IOException; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; /** * This class tests MiniOzoneHAClusterImpl. @@ -71,8 +69,6 @@ public void init() throws Exception { conf.setBoolean(OZONE_ACL_ENABLED, true); conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, - 2, TimeUnit.SECONDS); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newOMHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java index aaf07e8b7d23..a93e3ce98e20 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java @@ -18,14 +18,11 @@ import java.io.IOException; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.security.authentication.client.AuthenticationException; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -67,8 +64,6 @@ public static void init() throws Exception { clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); - conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, - 2, TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(conf) .setClusterId(clusterId) .setScmId(scmId) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java index c94970cad413..32cd7beb1738 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java @@ -43,7 +43,6 @@ import java.security.PublicKey; import java.security.cert.X509Certificate; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; @@ -51,7 +50,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.apache.ozone.test.GenericTestUtils.LogCapturer; import static org.apache.ozone.test.GenericTestUtils.getTempPath; @@ -85,8 +83,6 @@ public void init() throws Exception { omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); - conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, - 2, TimeUnit.SECONDS); conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2); conf.set(OZONE_SCM_NAMES, "localhost"); From c606aa94f90ff9d8c9b3070c662be2088c956a4f Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 6 Jun 2022 17:29:27 +0800 Subject: [PATCH 02/11] dispatch OpenKeyDeleteRequest --- .../src/main/proto/OmClientProtocol.proto | 1 + .../ozone/om/ratis/utils/OzoneManagerRatisUtils.java | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 77968815dfed..4b6a8dd34260 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1151,6 +1151,7 @@ message PurgePathRequest { message DeleteOpenKeysRequest { repeated OpenKeyBucket openKeysPerBucket = 1; + optional BucketLayoutProto bucketLayout = 2; } message OpenKeyBucket { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 86254064c367..1c2224f58de9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest; import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO; +import org.apache.hadoop.ozone.om.request.key.OMOpenKeysDeleteRequest; import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest; import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest; import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequestWithFSO; @@ -210,6 +211,13 @@ public static OMClientRequest createClientRequest(OMRequest omRequest, return new OMTenantRevokeAdminRequest(omRequest); case SetRangerServiceVersion: return new OMSetRangerServiceVersionRequest(omRequest); + case DeleteOpenKeys: + BucketLayout bktLayout = BucketLayout.DEFAULT; + if (omRequest.getDeleteOpenKeysRequest().hasBucketLayout()) { + bktLayout = BucketLayout.fromProto( + omRequest.getDeleteOpenKeysRequest().getBucketLayout()); + } + return new OMOpenKeysDeleteRequest(omRequest, bktLayout); /* * Key requests that can have multiple variants based on the bucket layout From 8aad903f72446bd977fe47b0d8790bebe6249f18 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 24 Mar 2022 14:46:48 +0800 Subject: [PATCH 03/11] Implement OpenKeyCleanupService --- .../ozone/om/OpenKeyCleanupService.java | 161 ++++++++++++++++-- 1 file changed, 144 insertions(+), 17 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 1a4e17c96fdb..4d4322bc8c49 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -18,18 +18,32 @@ package org.apache.hadoop.ozone.om; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * This is the background service to delete hanging open keys. @@ -43,28 +57,88 @@ public class OpenKeyCleanupService extends BackgroundService { private static final Logger LOG = LoggerFactory.getLogger(OpenKeyCleanupService.class); - private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2; + // Use only a single thread for open key deletion. Multiple threads would read + // from the same table and can send deletion requests for same key multiple + // times. + private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 1; + private final OzoneManager ozoneManager; private final KeyManager keyManager; - private final ScmBlockLocationProtocol scmClient; + // Dummy client ID to use for response, since this is triggered by a + // service, not the client. + private final ClientId clientId = ClientId.randomId(); + private final Duration expireThreshold; + private final int cleanupLimitPerTask; + private final AtomicLong submittedOpenKeyCount; + private final AtomicLong runCount; - public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient, - KeyManager keyManager, int serviceInterval, - long serviceTimeout) { + public OpenKeyCleanupService(OzoneManager ozoneManager, KeyManager keyManager, + ConfigurationSource conf, int serviceInterval, long serviceTimeout) { super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.ozoneManager = ozoneManager; this.keyManager = keyManager; - this.scmClient = scmClient; + + long expireMillis = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, + OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT, + TimeUnit.MILLISECONDS); + this.expireThreshold = Duration.ofMillis(expireMillis); + + this.cleanupLimitPerTask = conf.getInt( + OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK, + OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT); + + this.submittedOpenKeyCount = new AtomicLong(0); + this.runCount = new AtomicLong(0); + } + + /** + * Returns the number of times this Background service has run. + * + * @return Long, run count. + */ + @VisibleForTesting + public AtomicLong getRunCount() { + return runCount; + } + + /** + * Returns the number of open keys that were submitted for deletion by this + * service. If these keys were committed from the open key table between + * being submitted for deletion and the actual delete operation, they will + * not be deleted. + * + * @return Long count. + */ + @VisibleForTesting + public AtomicLong getSubmittedOpenKeyCount() { + return submittedOpenKeyCount; } @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new OpenKeyDeletingTask()); + queue.add(new OpenKeyCleanupTask(BucketLayout.DEFAULT)); + queue.add(new OpenKeyCleanupTask(BucketLayout.FILE_SYSTEM_OPTIMIZED)); return queue; } - private class OpenKeyDeletingTask implements BackgroundTask { + private boolean shouldRun() { + return ozoneManager.isLeaderReady(); + } + + private boolean isRatisEnabled() { + return ozoneManager.isRatisEnabled(); + } + + private class OpenKeyCleanupTask implements BackgroundTask { + + private final BucketLayout bucketLayout; + + OpenKeyCleanupTask(BucketLayout bucketLayout) { + this.bucketLayout = bucketLayout; + } @Override public int getPriority() { @@ -73,18 +147,71 @@ public int getPriority() { @Override public BackgroundTaskResult call() throws Exception { - // This method is currently never used. It will be implemented in - // HDDS-4122, and integrated into the rest of the code base in HDDS-4123. + if (!shouldRun()) { + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + + runCount.incrementAndGet(); + long startTime = Time.monotonicNow(); + List openKeyBuckets = null; try { - // The new API for deleting expired open keys in OM HA will differ - // significantly from the old implementation. - // The old implementation has been removed so the code compiles. - keyManager.getExpiredOpenKeys(Duration.ZERO, 0, BucketLayout.DEFAULT); + openKeyBuckets = keyManager.getExpiredOpenKeys(expireThreshold, + cleanupLimitPerTask, bucketLayout); } catch (IOException e) { - LOG.error("Unable to get hanging open keys, retry in" - + " next interval", e); + LOG.error("Unable to get hanging open keys, retry in next interval", e); + } + + if (openKeyBuckets != null && !openKeyBuckets.isEmpty()) { + int numOpenKeys = openKeyBuckets.stream() + .mapToInt(OpenKeyBucket::getKeysCount).sum(); + + OMRequest omRequest = createRequest(openKeyBuckets); + submitRequest(omRequest); + + LOG.debug("Number of expired keys submitted for deletion: {}, elapsed" + + " time: {}ms", numOpenKeys, Time.monotonicNow() - startTime); + submittedOpenKeyCount.addAndGet(numOpenKeys); } return BackgroundTaskResult.EmptyTaskResult.newResult(); } + + private OMRequest createRequest(List openKeyBuckets) { + DeleteOpenKeysRequest request = + DeleteOpenKeysRequest.newBuilder() + .addAllOpenKeysPerBucket(openKeyBuckets) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.DeleteOpenKeys) + .setDeleteOpenKeysRequest(request) + .setClientId(clientId.toString()) + .build(); + + return omRequest; + } + + private void submitRequest(OMRequest omRequest) { + try { + if (isRatisEnabled()) { + OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); + + RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() + .setClientId(clientId) + .setServerId(server.getRaftPeerId()) + .setGroupId(server.getRaftGroupId()) + .setCallId(runCount.get()) + .setMessage(Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); + + server.submitRequest(omRequest, raftClientRequest); + } else { + ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); + } + } catch (ServiceException e) { + LOG.error("Open key delete request failed. Will retry at next run.", e); + } + } } } From 1569de3d2197f3802d0d6512657b43a940cc5289 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 19 Apr 2022 20:23:00 +0800 Subject: [PATCH 04/11] Introduce ozone.om.open.key.cleanup.service.timeout --- .../common/src/main/resources/ozone-default.xml | 13 +++++++++++++ .../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 21c580b8cd69..bb0e677a8c8e 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1249,6 +1249,19 @@ + + ozone.om.open.key.cleanup.service.timeout + 300s + OZONE, OM, PERFORMANCE + A timeout value of open key cleanup service. If this is set + greater than 0, the service will stop waiting for the open key deleting + completion after this time. If timeout happens to a large proportion of + open key deletion, this value needs to be increased or + ozone.om.open.key.cleanup.limit.per.task should be decreased. + Unit could be defined with postfix (ns,ms,s,m,h,d) + + + ozone.om.open.key.expire.threshold 7d diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index aa34f049f425..99d56d2b02ee 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -91,6 +91,11 @@ private OMConfigKeys() { public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT = "24h"; + public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT = + "ozone.om.open.key.cleanup.service.timeout"; + public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT + = "300s"; + public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD = "ozone.om.open.key.expire.threshold"; public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT = From 578cebd033780f68a0932460f3f9e81b97713eb4 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 19 Apr 2022 20:27:08 +0800 Subject: [PATCH 05/11] Enable open key cleanup service --- .../apache/hadoop/ozone/om/KeyManager.java | 6 ++++ .../hadoop/ozone/om/KeyManagerImpl.java | 28 +++++++++++++++++++ .../ozone/om/OpenKeyCleanupService.java | 10 +++---- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 7bcb00fafbb4..8bcffa60d24f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -225,4 +225,10 @@ List getPendingDeletionSubFiles(long volumeId, * @return Background service. */ BackgroundService getDirDeletingService(); + + /** + * Returns the instance of Open Key Cleanup Service. + * @return Background service. + */ + BackgroundService getOpenKeyCleanupService(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 93c59a524a03..9193ff5edb2a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -121,6 +121,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; @@ -167,6 +171,8 @@ public class KeyManagerImpl implements KeyManager { private final boolean enableFileSystemPaths; private BackgroundService dirDeletingService; + private BackgroundService openKeyCleanupService; + @VisibleForTesting public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OMMetadataManager metadataManager, OzoneConfiguration conf, String omId, @@ -254,6 +260,20 @@ public void start(OzoneConfiguration configuration) { TimeUnit.SECONDS, serviceTimeout, ozoneManager, configuration); dirDeletingService.start(); } + + if (openKeyCleanupService == null) { + long serviceInterval = configuration.getTimeDuration( + OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, + OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = configuration.getTimeDuration( + OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT, + OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + openKeyCleanupService = new OpenKeyCleanupService(serviceInterval, + TimeUnit.SECONDS, serviceTimeout, ozoneManager, configuration); + openKeyCleanupService.start(); + } } KeyProviderCryptoExtension getKMSProvider() { @@ -270,6 +290,10 @@ public void stop() throws IOException { dirDeletingService.shutdown(); dirDeletingService = null; } + if (openKeyCleanupService != null) { + openKeyCleanupService.shutdown(); + openKeyCleanupService = null; + } } private OmBucketInfo getBucketInfo(String volumeName, String bucketName) @@ -604,6 +628,10 @@ public BackgroundService getDirDeletingService() { return dirDeletingService; } + public BackgroundService getOpenKeyCleanupService() { + return openKeyCleanupService; + } + @Override public OmMultipartUploadList listMultipartUploads(String volumeName, String bucketName, String prefix) throws OMException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 4d4322bc8c49..08499712b779 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -72,12 +72,12 @@ public class OpenKeyCleanupService extends BackgroundService { private final AtomicLong submittedOpenKeyCount; private final AtomicLong runCount; - public OpenKeyCleanupService(OzoneManager ozoneManager, KeyManager keyManager, - ConfigurationSource conf, int serviceInterval, long serviceTimeout) { - super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, - OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, + OzoneManager ozoneManager, ConfigurationSource conf) { + super("OpenKeyCleanupService", interval, unit, + OPEN_KEY_DELETING_CORE_POOL_SIZE, timeout); this.ozoneManager = ozoneManager; - this.keyManager = keyManager; + this.keyManager = ozoneManager.getKeyManager(); long expireMillis = conf.getTimeDuration( OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, From 94e5095ad4cce8225420838ddb9a2e65c6277823 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 20 Apr 2022 09:54:16 +0800 Subject: [PATCH 06/11] Add TestOpenKeyCleanupService --- .../hadoop/ozone/om/KeyManagerImpl.java | 2 +- .../ozone/om/OpenKeyCleanupService.java | 14 +- .../ozone/om/TestOpenKeyCleanupService.java | 203 ++++++++++++++++++ 3 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 9193ff5edb2a..b592d2503671 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -271,7 +271,7 @@ public void start(OzoneConfiguration configuration) { OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); openKeyCleanupService = new OpenKeyCleanupService(serviceInterval, - TimeUnit.SECONDS, serviceTimeout, ozoneManager, configuration); + TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration); openKeyCleanupService.start(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 08499712b779..1a6c727ee083 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -53,11 +53,10 @@ * success for keys, then clean up those keys. */ public class OpenKeyCleanupService extends BackgroundService { - private static final Logger LOG = LoggerFactory.getLogger(OpenKeyCleanupService.class); - // Use only a single thread for open key deletion. Multiple threads would read + // Use only a single thread for OpenKeyCleanup. Multiple threads would read // from the same table and can send deletion requests for same key multiple // times. private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 1; @@ -99,8 +98,8 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, * @return Long, run count. */ @VisibleForTesting - public AtomicLong getRunCount() { - return runCount; + public long getRunCount() { + return runCount.get(); } /** @@ -109,11 +108,11 @@ public AtomicLong getRunCount() { * being submitted for deletion and the actual delete operation, they will * not be deleted. * - * @return Long count. + * @return long count. */ @VisibleForTesting - public AtomicLong getSubmittedOpenKeyCount() { - return submittedOpenKeyCount; + public long getSubmittedOpenKeyCount() { + return submittedOpenKeyCount.get(); } @Override @@ -179,6 +178,7 @@ private OMRequest createRequest(List openKeyBuckets) { DeleteOpenKeysRequest request = DeleteOpenKeysRequest.newBuilder() .addAllOpenKeysPerBucket(openKeyBuckets) + .setBucketLayout(bucketLayout.toProto()) .build(); OMRequest omRequest = OMRequest.newBuilder() diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java new file mode 100644 index 000000000000..c009df870439 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.utils.db.DBConfigFromFile; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.ExitUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test Key Deleting Service. + *

+ * This test does the following things. + *

+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using + * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up + * and call into SCM. 4. Confirms that calls have been successful. + */ +public class TestOpenKeyCleanupService { + private OzoneManagerProtocol writeClient; + private OzoneManager om; + private static final Logger LOG = + LoggerFactory.getLogger(TestOpenKeyCleanupService.class); + + private static final Duration serviceInterval = Duration.ofMillis(1000); + private static final Duration expireThreshold = Duration.ofMillis(1000); + private OzoneConfiguration conf; + + @BeforeAll + public static void setup() { + ExitUtils.disableSystemExit(); + } + + @BeforeEach + private void createConfAndInitValues(@TempDir Path tempDir) { + conf = new OzoneConfiguration(); + System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); + ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString()); + conf.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, + serviceInterval.toMillis(), TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, + expireThreshold.toMillis(), TimeUnit.MILLISECONDS); + conf.setQuietMode(false); + } + + @AfterEach + public void cleanup() throws Exception { + om.stop(); + } + + /** + * In this test, we create a bunch of keys and delete them. Then we start the + * KeyDeletingService and pass a SCMClient which does not fail. We make sure + * that all the keys that we deleted is picked up and deleted by + * OzoneManager. + * + * @throws IOException - on Failure. + */ + + @Test + @Timeout(300) + public void checkIfCleanupServiceIsDeletingExpiredOpenKeys() + throws IOException, TimeoutException, InterruptedException, + AuthenticationException { + OmTestManagers omTestManagers = new OmTestManagers(conf); + final KeyManager keyManager = omTestManagers.getKeyManager(); + final OMMetadataManager omMetadataManager = + omTestManagers.getMetadataManager(); + + writeClient = omTestManagers.getWriteClient(); + om = omTestManagers.getOzoneManager(); + + final int keyCount = 100; + createOpenKeys(omMetadataManager, keyCount, 1); + + Thread.sleep(expireThreshold.toMillis()); + assertFalse(keyManager.getExpiredOpenKeys(expireThreshold, + 1, BucketLayout.DEFAULT).isEmpty()); + assertFalse(keyManager.getExpiredOpenKeys(expireThreshold, + 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); + + OpenKeyCleanupService openKeyCleanupService = + (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService(); + GenericTestUtils.waitFor(() -> openKeyCleanupService + .getSubmittedOpenKeyCount() >= keyCount, + 1000, 10000); + assertTrue(openKeyCleanupService.getRunCount() > 0); + assertTrue(keyManager.getExpiredOpenKeys(expireThreshold, + 1, BucketLayout.DEFAULT).isEmpty()); + assertTrue(keyManager.getExpiredOpenKeys(expireThreshold, + 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); + } + + private void createOpenKeys(OMMetadataManager omMetadataManager, + int keyCount, int numBlocks) throws IOException { + for (int x = 0; x < keyCount; x++) { + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + String key = UUID.randomUUID().toString(); + + // Create Volume and Bucket + BucketLayout bucketLayout = (x % 2 == 0) ? BucketLayout.DEFAULT + : BucketLayout.FILE_SYSTEM_OPTIMIZED; + createVolumeAndBucket(omMetadataManager, volume, bucket, bucketLayout); + + // Create the key + createOpenKey(omMetadataManager, volume, bucket, key, numBlocks); + } + } + + private void createVolumeAndBucket(OMMetadataManager omMetadataManager, + String volumeName, String bucketName, BucketLayout bucketLayout) + throws IOException { + // cheat here, just create a volume and bucket entry so that we can + // create the keys, we put the same data for key and value since the + // system does not decode the object + OMRequestTestUtils.addVolumeToOM(omMetadataManager, + OmVolumeArgs.newBuilder() + .setOwnerName("o") + .setAdminName("a") + .setVolume(volumeName) + .build()); + + OMRequestTestUtils.addBucketToOM(omMetadataManager, + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setBucketLayout(bucketLayout) + .build()); + } + + private void createOpenKey(OMMetadataManager omMetadataManager, + String volumeName, String bucketName, String keyName, int numBlocks) + throws IOException { + OmKeyArgs keyArg = + new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setAcls(Collections.emptyList()) + .setReplicationConfig(StandaloneReplicationConfig.getInstance( + HddsProtos.ReplicationFactor.ONE)) + .setLocationInfoList(new ArrayList<>()) + .build(); + + // Open and write the key without commit it. + OpenKeySession session = writeClient.openKey(keyArg); + for (int i = 0; i < numBlocks; i++) { + keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(), + new ExcludeList())); + } + } +} From 19473030a9849c66ff06065315f2320f33d99980 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 6 Jun 2022 20:15:34 +0800 Subject: [PATCH 07/11] Improve test --- hadoop-ozone/ozone-manager/pom.xml | 4 + .../ozone/om/OpenKeyCleanupService.java | 21 ++++- .../ozone/om/TestOpenKeyCleanupService.java | 84 +++++++++++-------- 3 files changed, 73 insertions(+), 36 deletions(-) diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml index 0201397b4a02..55b426e48ea9 100644 --- a/hadoop-ozone/ozone-manager/pom.xml +++ b/hadoop-ozone/ozone-manager/pom.xml @@ -194,6 +194,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> + + org.junit.jupiter + junit-jupiter-params + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 1a6c727ee083..3cb9035149d1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -43,6 +43,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -70,6 +71,7 @@ public class OpenKeyCleanupService extends BackgroundService { private final int cleanupLimitPerTask; private final AtomicLong submittedOpenKeyCount; private final AtomicLong runCount; + private final AtomicBoolean suspended; public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, OzoneManager ozoneManager, ConfigurationSource conf) { @@ -90,6 +92,7 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout, this.submittedOpenKeyCount = new AtomicLong(0); this.runCount = new AtomicLong(0); + this.suspended = new AtomicBoolean(false); } /** @@ -102,6 +105,22 @@ public long getRunCount() { return runCount.get(); } + /** + * Suspend the service (for testing). + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended (for testing). + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + /** * Returns the number of open keys that were submitted for deletion by this * service. If these keys were committed from the open key table between @@ -124,7 +143,7 @@ public BackgroundTaskQueue getTasks() { } private boolean shouldRun() { - return ozoneManager.isLeaderReady(); + return !suspended.get() && ozoneManager.isLeaderReady(); } private boolean isRatisEnabled() { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java index c009df870439..969da0b6f0f0 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -32,15 +33,15 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; -import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.ExitUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,10 @@ import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -73,8 +73,8 @@ public class TestOpenKeyCleanupService { private static final Logger LOG = LoggerFactory.getLogger(TestOpenKeyCleanupService.class); - private static final Duration serviceInterval = Duration.ofMillis(1000); - private static final Duration expireThreshold = Duration.ofMillis(1000); + private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100); + private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200); private OzoneConfiguration conf; @BeforeAll @@ -83,14 +83,14 @@ public static void setup() { } @BeforeEach - private void createConfAndInitValues(@TempDir Path tempDir) { + public void createConfAndInitValues(@TempDir Path tempDir) { conf = new OzoneConfiguration(); System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString()); conf.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, - serviceInterval.toMillis(), TimeUnit.MILLISECONDS); + SERVICE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, - expireThreshold.toMillis(), TimeUnit.MILLISECONDS); + EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS); conf.setQuietMode(false); } @@ -108,11 +108,15 @@ public void cleanup() throws Exception { * @throws IOException - on Failure. */ - @Test + @ParameterizedTest + @CsvSource({ + "99, 0", + "0, 88", + "66, 77" + }) @Timeout(300) - public void checkIfCleanupServiceIsDeletingExpiredOpenKeys() - throws IOException, TimeoutException, InterruptedException, - AuthenticationException { + public void checkIfCleanupServiceIsDeletingExpiredOpenKeys( + int numDEFKeys, int numFSOKeys) throws Exception { OmTestManagers omTestManagers = new OmTestManagers(conf); final KeyManager keyManager = omTestManagers.getKeyManager(); final OMMetadataManager omMetadataManager = @@ -121,41 +125,52 @@ public void checkIfCleanupServiceIsDeletingExpiredOpenKeys() writeClient = omTestManagers.getWriteClient(); om = omTestManagers.getOzoneManager(); - final int keyCount = 100; - createOpenKeys(omMetadataManager, keyCount, 1); - - Thread.sleep(expireThreshold.toMillis()); - assertFalse(keyManager.getExpiredOpenKeys(expireThreshold, - 1, BucketLayout.DEFAULT).isEmpty()); - assertFalse(keyManager.getExpiredOpenKeys(expireThreshold, - 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); OpenKeyCleanupService openKeyCleanupService = (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService(); + openKeyCleanupService.suspend(); + + final int keyCount = numDEFKeys + numFSOKeys; + createOpenKeys(omMetadataManager, numDEFKeys, BucketLayout.DEFAULT); + createOpenKeys(omMetadataManager, numFSOKeys, + BucketLayout.FILE_SYSTEM_OPTIMIZED); + Thread.sleep(EXPIRE_THRESHOLD.toMillis() + SERVICE_INTERVAL.toMillis()); + assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys( + EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty()); + assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys( + EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); + + openKeyCleanupService.resume(); + GenericTestUtils.waitFor(() -> openKeyCleanupService .getSubmittedOpenKeyCount() >= keyCount, - 1000, 10000); + (int) SERVICE_INTERVAL.toMillis(), + 10 * (int) SERVICE_INTERVAL.toMillis()); assertTrue(openKeyCleanupService.getRunCount() > 0); - assertTrue(keyManager.getExpiredOpenKeys(expireThreshold, + assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty()); - assertTrue(keyManager.getExpiredOpenKeys(expireThreshold, + assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); } private void createOpenKeys(OMMetadataManager omMetadataManager, - int keyCount, int numBlocks) throws IOException { + int keyCount, BucketLayout bucketLayout) + throws IOException { + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); for (int x = 0; x < keyCount; x++) { - String volume = UUID.randomUUID().toString(); - String bucket = UUID.randomUUID().toString(); + if (RandomUtils.nextBoolean()) { + bucket = UUID.randomUUID().toString(); + if (RandomUtils.nextBoolean()) { + volume = UUID.randomUUID().toString(); + } + } String key = UUID.randomUUID().toString(); - - // Create Volume and Bucket - BucketLayout bucketLayout = (x % 2 == 0) ? BucketLayout.DEFAULT - : BucketLayout.FILE_SYSTEM_OPTIMIZED; createVolumeAndBucket(omMetadataManager, volume, bucket, bucketLayout); + final int numBlocks = RandomUtils.nextInt(0, 3); // Create the key - createOpenKey(omMetadataManager, volume, bucket, key, numBlocks); + createOpenKey(volume, bucket, key, numBlocks); } } @@ -179,9 +194,8 @@ private void createVolumeAndBucket(OMMetadataManager omMetadataManager, .build()); } - private void createOpenKey(OMMetadataManager omMetadataManager, - String volumeName, String bucketName, String keyName, int numBlocks) - throws IOException { + private void createOpenKey(String volumeName, String bucketName, + String keyName, int numBlocks) throws IOException { OmKeyArgs keyArg = new OmKeyArgs.Builder() .setVolumeName(volumeName) From 543fecc2a0110ed1570d1a81ca1bf5ecba8658b3 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 6 Jun 2022 20:23:23 +0800 Subject: [PATCH 08/11] Improve test --- .../ozone/om/TestOpenKeyCleanupService.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java index 969da0b6f0f0..610c987b7c96 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -75,7 +75,8 @@ public class TestOpenKeyCleanupService { private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100); private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200); - private OzoneConfiguration conf; + private KeyManager keyManager; + private OMMetadataManager omMetadataManager; @BeforeAll public static void setup() { @@ -83,8 +84,8 @@ public static void setup() { } @BeforeEach - public void createConfAndInitValues(@TempDir Path tempDir) { - conf = new OzoneConfiguration(); + public void createConfAndInitValues(@TempDir Path tempDir) throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString()); conf.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, @@ -92,6 +93,11 @@ public void createConfAndInitValues(@TempDir Path tempDir) { conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS); conf.setQuietMode(false); + OmTestManagers omTestManagers = new OmTestManagers(conf); + keyManager = omTestManagers.getKeyManager(); + omMetadataManager = omTestManagers.getMetadataManager(); + writeClient = omTestManagers.getWriteClient(); + om = omTestManagers.getOzoneManager(); } @AfterEach @@ -117,23 +123,15 @@ public void cleanup() throws Exception { @Timeout(300) public void checkIfCleanupServiceIsDeletingExpiredOpenKeys( int numDEFKeys, int numFSOKeys) throws Exception { - OmTestManagers omTestManagers = new OmTestManagers(conf); - final KeyManager keyManager = omTestManagers.getKeyManager(); - final OMMetadataManager omMetadataManager = - omTestManagers.getMetadataManager(); - - writeClient = omTestManagers.getWriteClient(); - om = omTestManagers.getOzoneManager(); - OpenKeyCleanupService openKeyCleanupService = (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService(); + openKeyCleanupService.suspend(); final int keyCount = numDEFKeys + numFSOKeys; - createOpenKeys(omMetadataManager, numDEFKeys, BucketLayout.DEFAULT); - createOpenKeys(omMetadataManager, numFSOKeys, - BucketLayout.FILE_SYSTEM_OPTIMIZED); + createOpenKeys(numDEFKeys, BucketLayout.DEFAULT); + createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED); Thread.sleep(EXPIRE_THRESHOLD.toMillis() + SERVICE_INTERVAL.toMillis()); assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys( EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty()); @@ -153,8 +151,7 @@ public void checkIfCleanupServiceIsDeletingExpiredOpenKeys( 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty()); } - private void createOpenKeys(OMMetadataManager omMetadataManager, - int keyCount, BucketLayout bucketLayout) + private void createOpenKeys(int keyCount, BucketLayout bucketLayout) throws IOException { String volume = UUID.randomUUID().toString(); String bucket = UUID.randomUUID().toString(); From abad72effa446b27f4d26e6fae70374ee3416783 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 6 Jun 2022 20:26:20 +0800 Subject: [PATCH 09/11] Fix checkstyle --- .../apache/hadoop/ozone/om/TestOpenKeyCleanupService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java index 610c987b7c96..bc07cf114980 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -163,7 +163,7 @@ private void createOpenKeys(int keyCount, BucketLayout bucketLayout) } } String key = UUID.randomUUID().toString(); - createVolumeAndBucket(omMetadataManager, volume, bucket, bucketLayout); + createVolumeAndBucket(volume, bucket, bucketLayout); final int numBlocks = RandomUtils.nextInt(0, 3); // Create the key @@ -171,9 +171,8 @@ private void createOpenKeys(int keyCount, BucketLayout bucketLayout) } } - private void createVolumeAndBucket(OMMetadataManager omMetadataManager, - String volumeName, String bucketName, BucketLayout bucketLayout) - throws IOException { + private void createVolumeAndBucket(String volumeName, String bucketName, + BucketLayout bucketLayout) throws IOException { // cheat here, just create a volume and bucket entry so that we can // create the keys, we put the same data for key and value since the // system does not decode the object From c72cd9ef5d752f002d986b19ab599a2d7756b8b6 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 7 Jun 2022 10:56:24 +0800 Subject: [PATCH 10/11] Increase interval and timeout --- .../apache/hadoop/ozone/om/TestOpenKeyCleanupService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java index bc07cf114980..00a66dca72bf 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -73,8 +73,8 @@ public class TestOpenKeyCleanupService { private static final Logger LOG = LoggerFactory.getLogger(TestOpenKeyCleanupService.class); - private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100); - private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200); + private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500); + private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000); private KeyManager keyManager; private OMMetadataManager omMetadataManager; @@ -113,7 +113,6 @@ public void cleanup() throws Exception { * * @throws IOException - on Failure. */ - @ParameterizedTest @CsvSource({ "99, 0", From 2496b997971b3c773fb02f5f0d6e595c90c2e031 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 7 Jun 2022 13:13:39 +0800 Subject: [PATCH 11/11] Tweak wait in test --- .../ozone/om/TestOpenKeyCleanupService.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java index 00a66dca72bf..0fa4114e4cba 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -127,11 +127,18 @@ public void checkIfCleanupServiceIsDeletingExpiredOpenKeys( (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService(); openKeyCleanupService.suspend(); + // wait for submitted tasks to complete + Thread.sleep(SERVICE_INTERVAL.toMillis()); + final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount(); + final long oldrunCount = openKeyCleanupService.getRunCount(); final int keyCount = numDEFKeys + numFSOKeys; createOpenKeys(numDEFKeys, BucketLayout.DEFAULT); createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED); - Thread.sleep(EXPIRE_THRESHOLD.toMillis() + SERVICE_INTERVAL.toMillis()); + + // wait for open keys to expire + Thread.sleep(EXPIRE_THRESHOLD.toMillis()); + assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys( EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty()); assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys( @@ -140,10 +147,15 @@ public void checkIfCleanupServiceIsDeletingExpiredOpenKeys( openKeyCleanupService.resume(); GenericTestUtils.waitFor(() -> openKeyCleanupService - .getSubmittedOpenKeyCount() >= keyCount, + .getRunCount() > oldrunCount, (int) SERVICE_INTERVAL.toMillis(), - 10 * (int) SERVICE_INTERVAL.toMillis()); - assertTrue(openKeyCleanupService.getRunCount() > 0); + 5 * (int) SERVICE_INTERVAL.toMillis()); + + // wait for requests to complete + Thread.sleep(SERVICE_INTERVAL.toMillis()); + + assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >= + oldkeyCount + keyCount); assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty()); assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,