diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 36c1a5370935..7b40f265dc92 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -51,6 +51,12 @@ public abstract class BackgroundService { private final TimeUnit unit; private final PeriodicalTask service; + public BackgroundService(String serviceName, long interval, + TimeUnit unit, int threadPoolSize) { + // Set service timeout to 0 to disable. + this(serviceName, interval, unit, threadPoolSize, 0); + } + public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout) { this.interval = interval; @@ -117,7 +123,8 @@ public synchronized void run() { LOG.warn("Background task execution failed", e); } finally { long endTime = System.nanoTime(); - if (endTime - startTime > serviceTimeoutInNanos) { + if (serviceTimeoutInNanos > 0 && + endTime - startTime > serviceTimeoutInNanos) { LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", serviceName, endTime - startTime, serviceTimeoutInNanos); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index c5c442627218..083750920d7b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -158,23 +158,6 @@ public final class OzoneConfigKeys { public static final String OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT = "60s"; - /** - * The interval of open key clean service. - */ - public static final String OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS = - "ozone.open.key.cleanup.service.interval.seconds"; - public static final int - OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT - = 24 * 3600; // a total of 24 hour - - /** - * An open key gets cleaned up when it is being in open state for too long. - */ - public static final String OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS = - "ozone.open.key.expire.threshold"; - public static final int OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT = - 24 * 3600; - public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT = "ozone.block.deleting.service.timeout"; public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 47dbf39794fc..7af1bdc10726 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1133,23 +1133,36 @@ - ozone.open.key.cleanup.service.interval.seconds - 86400 + ozone.om.open.key.cleanup.service.interval + 24h OZONE, OM, PERFORMANCE - A background job periodically checks open key entries and delete the expired ones. This entry controls the - interval of this cleanup check. + A background job that periodically checks open key entries and marks + expired open keys for deletion. This entry controls the interval of this + cleanup check. - ozone.open.key.expire.threshold - 86400 + ozone.om.open.key.expire.threshold + 7d OZONE, OM, PERFORMANCE Controls how long an open key operation is considered active. Specifically, if a key has been open longer than the value of this config entry, that open key is considered as - expired (e.g. due to client crash). Default to 24 hours. + expired (e.g. due to client crash). + + + + + ozone.om.open.key.cleanup.limit.per.task + 1000 + OZONE, OM, PERFORMANCE + + The maximum number of open keys to be identified as expired and marked + for deletion by one run of the open key cleanup service on the OM. + This property is used to throttle the actual number of open key deletions + on the OM. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 3ad4ab9e0918..85c9bcb6e7b4 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -87,6 +87,21 @@ private OMConfigKeys() { "ozone.key.deleting.limit.per.task"; public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 20000; + public static final String OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL = + "ozone.om.open.key.cleanup.service.interval"; + public static final TimeDuration + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT = + TimeDuration.valueOf(24, TimeUnit.HOURS); + + public static final String OZONE_OPEN_KEY_EXPIRE_THRESHOLD = + "ozone.om.open.key.expire.threshold"; + public static final TimeDuration OZONE_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT = + TimeDuration.valueOf(7, TimeUnit.DAYS); + + public static final String OZONE_OPEN_KEY_CLEANUP_LIMIT_PER_TASK = + "ozone.om.open.key.cleanup.limit.per.task"; + public static final int OZONE_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT = 1000; + public static final String OZONE_OM_METRICS_SAVE_INTERVAL = "ozone.om.save.metrics.interval"; public static final String OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT = "5m"; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java index 051eb94d582e..3837b8023b6d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneHACluster.java @@ -33,12 +33,13 @@ import java.io.IOException; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD; /** * This class tests MiniOzoneHAClusterImpl. @@ -73,7 +74,7 @@ public void init() throws Exception { conf.setBoolean(OZONE_ACL_ENABLED, true); conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); + conf.setTimeDuration(OZONE_OPEN_KEY_EXPIRE_THRESHOLD, 2, TimeUnit.SECONDS); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) @@ -81,6 +82,7 @@ public void init() throws Exception { .setNumOfOzoneManagers(numOfOMs) .build(); cluster.waitForClusterToBeReady(); + cluster.restartOzoneManager(); objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) .getObjectStore(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 2c66885e3534..c61daafcc3a4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -43,7 +43,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; import static org.apache.hadoop.ozone.om.OMDBCheckpointServlet.writeOmDBCheckpointToStream; @@ -93,7 +92,6 @@ public void init() throws Exception { omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, false); conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); cluster = MiniOzoneCluster.newBuilder(conf) .setClusterId(clusterId) .setScmId(scmId) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java index 271109f0b3c7..08100cfa0c25 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java @@ -36,8 +36,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import org.junit.AfterClass; + import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; @@ -80,7 +80,6 @@ public static void init() throws Exception { scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setClass(OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizerTest.class, IAccessAuthorizer.class); conf.setStrings(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java index 7eb548d7a098..9745e2da006c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java @@ -18,12 +18,14 @@ import java.io.IOException; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.security.authentication.client.AuthenticationException; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -65,7 +67,7 @@ public static void init() throws Exception { clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); + conf.setTimeDuration(OZONE_OPEN_KEY_EXPIRE_THRESHOLD, 2, TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(conf) .setClusterId(clusterId) .setScmId(scmId) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java new file mode 100644 index 000000000000..09e00b163396 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + +/** + * Integration tests for the open key cleanup service on OM. + */ +@RunWith(Parameterized.class) +public class TestOpenKeyCleanupService { + // Increase service interval of open key cleanup so we can trigger the + // service manually between setting up the DB and checking the results. + // Increase service interval of key deleting service to ensure it does not + // run during the tests, interfering with results. + private static final TimeDuration TESTING_SERVICE_INTERVAL = + TimeDuration.valueOf(24, TimeUnit.HOURS); + // High expiration time used so keys without modified creation time will not + // expire during the test. + private static final TimeDuration TESTING_EXPIRE_THRESHOLD = + TimeDuration.valueOf(24, TimeUnit.HOURS); + // Maximum number of keys to be cleaned up per run of the service. + private static final int TESTING_TASK_LIMIT = 10; + // Volume and bucket created and added to the DB that will hold open keys + // created by this test unless tests specify otherwise. + private static final String DEFAULT_VOLUME = "volume"; + private static final String DEFAULT_BUCKET = "bucket"; + // Time in milliseconds to wait for followers in the cluster to apply + // transactions. + private static final int FOLLOWER_WAIT_TIMEOUT = 10000; + // Time in milliseconds between checks that followers have applied + // transactions. + private static final int FOLLOWER_CHECK_INTERVAL = 1000; + + private MiniOzoneCluster cluster; + private boolean isOMHA; + private List ozoneManagers; + + // Parameterized to test open key cleanup in both OM HA and non-HA. + @Parameterized.Parameters + public static Iterable parameters() { + List params = new ArrayList<>(); + params.add(new Boolean[]{true}); + params.add(new Boolean[]{false}); + return params; + } + + public TestOpenKeyCleanupService(boolean isOMHA) { + this.isOMHA = isOMHA; + } + + private void setupCluster() throws Exception { + setupCluster(TESTING_SERVICE_INTERVAL, TESTING_EXPIRE_THRESHOLD); + } + + private void setupCluster(TimeDuration openKeyCleanupServiceInterval, + TimeDuration openKeyExpireThreshold) throws Exception { + + OzoneConfiguration conf = new OzoneConfiguration(); + + // Make sure key deletion does not run during the tests. + conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + TESTING_SERVICE_INTERVAL.getDuration(), + TESTING_SERVICE_INTERVAL.getUnit()); + // Set open key cleanup configurations. + conf.setTimeDuration(OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, + openKeyCleanupServiceInterval.getDuration(), + openKeyCleanupServiceInterval.getUnit()); + conf.setTimeDuration(OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD, + openKeyExpireThreshold.getDuration(), openKeyExpireThreshold.getUnit()); + conf.setInt(OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_LIMIT_PER_TASK, + TESTING_TASK_LIMIT); + + if (isOMHA) { + cluster = MiniOzoneCluster.newHABuilder(conf) + .setOMServiceId("om-service-id") + .setNumOfOzoneManagers(3) + .build(); + ozoneManagers = ((MiniOzoneHAClusterImpl) cluster).getOzoneManagersList(); + } else { + cluster = MiniOzoneCluster.newBuilder(conf) + .build(); + ozoneManagers = Collections.singletonList(cluster.getOzoneManager()); + } + + cluster.waitForClusterToBeReady(); + + ObjectStore store = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + store.createVolume(DEFAULT_VOLUME); + store.getVolume(DEFAULT_VOLUME).createBucket(DEFAULT_BUCKET); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Uses a short service interval and expiration threshold to test + * the open key cleanup service. This allows all open keys to expire without + * artificially modifying their creation time, and allows the service to be + * triggered at its service interval, instead of being manually triggered. + *

+ * This approach does not allow manually creating separate expired and + * unexpired open keys, and does not provide a way to put an upper bound on + * the number of service runs. For this reason, all other unit tests use + * manually modified key creation times to differentiate expired and + * unexpired open keys, and trigger their service runs manually. + */ + @Test + public void testWithServiceInterval() throws Exception { + TimeDuration serviceInterval = TimeDuration.valueOf(100, + TimeUnit.MILLISECONDS); + TimeDuration expireThreshold = TimeDuration.valueOf(50, + TimeUnit.MILLISECONDS); + + setupCluster(serviceInterval, expireThreshold); + + final int numBlocks = 3; + final int minRuns = 2; + final int numKeys = TESTING_TASK_LIMIT * minRuns; + + // Setup test and verify the setup. + Set originalOpenKeys = createExpiredOpenKeys(numKeys, numBlocks); + Assert.assertEquals(numKeys, originalOpenKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(originalOpenKeys, getAllOpenKeys(om)); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + // Wait for all open keys to become expired and be deleted on all OMs when + // the service interval is triggered. + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> getAllOpenKeys(om).size() == 0); + + Assert.assertEquals(originalOpenKeys, getAllPendingDeleteKeys(om)); + } + + OpenKeyCleanupService service = getService(); + // The service may run more than this number of times, but it should have + // taken at least this many runs to delete all the open keys. + Assert.assertTrue(service.getRunCount().get() >= minRuns); + } + + /** + * Tests cleanup of expired open keys that do not have block data, meaning + * they should be removed from the open key table, but not added to the + * delete table. + */ + @Test + public void testOpenKeysWithoutBlockData() throws Exception { + setupCluster(); + + // Setup test and verify the setup. + Set originalOpenKeys = createOpenKeys(TESTING_TASK_LIMIT); + Assert.assertEquals(TESTING_TASK_LIMIT, originalOpenKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(originalOpenKeys, getAllOpenKeys(om)); + } + + Set originalExpiredOpenKeys = + createExpiredOpenKeys(TESTING_TASK_LIMIT); + Assert.assertEquals(TESTING_TASK_LIMIT, originalExpiredOpenKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(originalExpiredOpenKeys, getAllExpiredOpenKeys(om)); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + runService(); + + // Expired open keys with no block data should be removed from open key + // table without being put in the delete table. + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> originalOpenKeys.equals(getAllOpenKeys(om))); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + } + + /** + * Tests cleanup of expired open keys that do have block data, meaning + * they should be removed from the open key table, and added to the delete + * table. + */ + @Test + public void testOpenKeysWithBlockData() throws Exception { + setupCluster(); + + final int numBlocks = 3; + + // Setup test and verify the setup. + Set originalOpenKeys = createOpenKeys(TESTING_TASK_LIMIT); + Assert.assertEquals(TESTING_TASK_LIMIT, originalOpenKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(originalOpenKeys, getAllOpenKeys(om)); + } + + Set originalExpiredKeys = createExpiredOpenKeys(TESTING_TASK_LIMIT, + numBlocks); + Assert.assertEquals(TESTING_TASK_LIMIT, originalExpiredKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(originalExpiredKeys, getAllExpiredOpenKeys(om)); + } + + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + runService(); + + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> originalExpiredKeys.equals(getAllPendingDeleteKeys(om))); + Assert.assertEquals(originalOpenKeys, getAllOpenKeys(om)); + } + } + + @Test + public void testWithNoExpiredOpenKeys() throws Exception { + setupCluster(); + + Set originalOpenKeys = createOpenKeys(TESTING_TASK_LIMIT); + + // Verify test setup. + Assert.assertEquals(TESTING_TASK_LIMIT, originalOpenKeys.size()); + + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(getAllOpenKeys(om), originalOpenKeys); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + Assert.assertEquals(0, getAllExpiredOpenKeys(om).size()); + } + + runService(); + + // Tables should be unchanged since no keys are expired. + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> originalOpenKeys.equals(getAllOpenKeys(om))); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + } + + @Test + public void testWithNoOpenKeys() throws Exception { + setupCluster(); + + // Verify test setup. + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(0, getAllOpenKeys(om).size()); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + // Make sure service runs without errors. + runService(); + + // Tables should be unchanged since no keys are expired. + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> getAllOpenKeys(om).size() == 0); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + } + + /** + * Creates more expired open keys than can be deleted by the service after + * a fixed number of runs, and checks that the service does not exceed its + * task limit by deleting the extra keys. + */ + @Test + public void testTaskLimitWithMultipleRuns() throws Exception { + setupCluster(); + + final int numServiceRuns = 2; + final int numBlocks = 3; + final int numKeysToDelete = TESTING_TASK_LIMIT * numServiceRuns; + // Create more keys than the service will clean up in the allowed number + // of runs. + final int numKeys = numKeysToDelete + TESTING_TASK_LIMIT; + + Set originalExpiredKeys = createExpiredOpenKeys(numKeys, numBlocks); + + // Verify test setup. + Assert.assertEquals(numKeys, originalExpiredKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(getAllExpiredOpenKeys(om), originalExpiredKeys); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + // After each service run, wait for the service to finish so runs do not + // pick up the same keys to delete. + for (int run = 1; run <= numServiceRuns; run++) { + runService(); + int runNum = run; + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> getAllPendingDeleteKeys(om).size() == + TESTING_TASK_LIMIT * runNum); + } + } + + // Order that the service deletes keys is not defined. + // So for multiple runs that will not delete all keys, we can only + // check that the correct key counts were deleted, and that the deleted + // keys are a subset of the originally created keys. + for (OzoneManager om: ozoneManagers) { + Set pendingDeleteKeys = getAllPendingDeleteKeys(om); + Set expiredKeys = getAllExpiredOpenKeys(om); + + Assert.assertTrue(originalExpiredKeys.containsAll(pendingDeleteKeys)); + Assert.assertTrue(originalExpiredKeys.containsAll(expiredKeys)); + + // Service runs should have reached but not exceeded their task limit. + Assert.assertEquals(numKeysToDelete, pendingDeleteKeys.size()); + // All remaining keys should still be present in the open key table. + Assert.assertEquals(numKeys - numKeysToDelete, + expiredKeys.size()); + } + } + + /** + * Tests cleanup of open keys whose volume and bucket does not exist in the + * DB. This simulates the condition where open keys are deleted after the + * volume or bucket they were supposed to belong to if committed. + */ + @Test + public void testWithMissingVolumeAndBucket() throws Exception { + setupCluster(); + + int numBlocks = 3; + + // Open keys created from a non-existent volume and bucket. + Set originalExpiredKeys = createExpiredOpenKeys(DEFAULT_VOLUME + + "2", DEFAULT_BUCKET + "2", TESTING_TASK_LIMIT, numBlocks); + + // Verify test setup. + Assert.assertEquals(TESTING_TASK_LIMIT, originalExpiredKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(getAllExpiredOpenKeys(om), originalExpiredKeys); + Assert.assertEquals(0, getAllPendingDeleteKeys(om).size()); + } + + runService(); + + // All keys should have been cleaned up. + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> getAllExpiredOpenKeys(om).size() == 0); + + Assert.assertEquals(originalExpiredKeys, getAllPendingDeleteKeys(om)); + } + } + + /** + * Tests cleanup of expired open keys across multiple volumes and buckets, + * some of which exist and some of which do not. + */ + @Test + public void testWithMultipleVolumesAndBuckets() throws Exception { + setupCluster(); + + int numKeysPerBucket = TESTING_TASK_LIMIT; + int numBlocks = 3; + int numServiceRuns = 3; + + Set allCreatedKeys = new HashSet<>(); + + // Open keys created from the default volume and bucket. + allCreatedKeys.addAll( + createExpiredOpenKeys(numKeysPerBucket, numBlocks)); + + // Open keys created from the default volume and a non-existent bucket. + allCreatedKeys.addAll( + createExpiredOpenKeys(DEFAULT_VOLUME, DEFAULT_BUCKET + "2", + numKeysPerBucket, numBlocks)); + + // Open keys created from a non-existent volume and bucket. + allCreatedKeys.addAll( + createExpiredOpenKeys(DEFAULT_VOLUME + "2", DEFAULT_BUCKET + "2", + numKeysPerBucket, numBlocks)); + + // Verify test setup. + Assert.assertEquals(numKeysPerBucket * 3, allCreatedKeys.size()); + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(getAllOpenKeys(om), allCreatedKeys); + } + + // After each service run, wait for the service to finish so runs do not + // pick up the same keys to delete. + for (int run = 1; run <= numServiceRuns; run++) { + runService(); + int runNum = run; + for (OzoneManager om: ozoneManagers) { + LambdaTestUtils.await(FOLLOWER_WAIT_TIMEOUT, FOLLOWER_CHECK_INTERVAL, + () -> getAllPendingDeleteKeys(om).size() == + TESTING_TASK_LIMIT * runNum); + } + } + + // All keys should have been cleaned up. + for (OzoneManager om: ozoneManagers) { + Assert.assertEquals(allCreatedKeys, getAllPendingDeleteKeys(om)); + Assert.assertEquals(0, getAllExpiredOpenKeys(om).size()); + } + } + + private Set getAllExpiredOpenKeys(OzoneManager om) throws Exception { + return new HashSet<>(om.getKeyManager() + .getExpiredOpenKeys(TESTING_EXPIRE_THRESHOLD, Integer.MAX_VALUE)); + } + + private Set getAllOpenKeys(OzoneManager om) throws Exception { + Set keys = new HashSet<>(); + List> keyPairs = + om.getMetadataManager().getOpenKeyTable() + .getRangeKVs(null, Integer.MAX_VALUE); + + for (Table.KeyValue keyPair: keyPairs) { + keys.add(keyPair.getKey()); + } + + return keys; + } + + private Set getAllPendingDeleteKeys(OzoneManager om) + throws Exception { + List blocks = + om.getKeyManager().getPendingDeletionKeys(Integer.MAX_VALUE); + + Set keyNames = new HashSet<>(); + for (BlockGroup block: blocks) { + keyNames.add(block.getGroupID()); + } + + return keyNames; + } + + /** + * Runs the key deleting service on the OM leader, + * but does not wait for OMs to apply results of the + * run to their DBs before returning. + * + * Callers should wait on their desired state for each OM after the service + * runs before running assertions about OM state. + */ + private void runService() throws Exception { + getService().getTasks().poll().call(); + } + + private OpenKeyCleanupService getService() { + return (OpenKeyCleanupService) getLeader().getKeyManager() + .getOpenKeyCleanupService(); + } + + private OzoneManager getLeader() { + if (cluster instanceof MiniOzoneHAClusterImpl) { + return ((MiniOzoneHAClusterImpl) cluster).getOMLeader(); + } else { + return cluster.getOzoneManager(); + } + } + + private Set createExpiredOpenKeys(int numKeys) throws Exception { + return createOpenKeys(DEFAULT_VOLUME, DEFAULT_BUCKET, numKeys, 0, true); + } + + private Set createExpiredOpenKeys(int numKeys, int numBlocks) + throws Exception { + return createOpenKeys(DEFAULT_VOLUME, DEFAULT_BUCKET, numKeys, numBlocks, + true); + } + + private Set createExpiredOpenKeys(String volume, String bucket, + int numKeys, int numBlocks) throws Exception { + return createOpenKeys(volume, bucket, numKeys, numBlocks, true); + } + + private Set createOpenKeys(int numKeys) throws Exception { + return createOpenKeys(DEFAULT_VOLUME, DEFAULT_BUCKET, numKeys, 0, false); + } + + /** + * Adds open keys to the open key table of every OM in the cluster. + * Keys are manually inserted into each OM's DB so that creation time can + * be artificially set to simulate expiration. + */ + private Set createOpenKeys(String volume, String bucket, int numKeys, + int numBlocks, boolean expired) throws Exception { + Set openKeys = new HashSet<>(); + long creationTime = Instant.now().toEpochMilli(); + + // Simulate expired keys by creating them with age twice that of the + // expiration age. + if (expired) { + long ageMillis = TESTING_EXPIRE_THRESHOLD + .add(TESTING_EXPIRE_THRESHOLD) + .toLong(TimeUnit.MILLISECONDS); + creationTime -= ageMillis; + } + + for (int i = 0; i < numKeys; i++) { + String key = null; + if (i == 0) { + // Add one messy key with lots of separators for testing. + key = OM_KEY_PREFIX + + UUID.randomUUID().toString() + + OM_KEY_PREFIX + + OM_KEY_PREFIX + + UUID.randomUUID().toString() + + OM_KEY_PREFIX + + OM_KEY_PREFIX; + } else { + key = UUID.randomUUID().toString(); + } + + long clientID = new Random().nextLong(); + + OmKeyInfo keyInfo = TestOMRequestUtils.createOmKeyInfo(volume, + bucket, key, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, 0L, creationTime); + + if (numBlocks > 0) { + TestOMRequestUtils.addKeyLocationInfo(keyInfo, 0, numBlocks); + } + + // Insert keys into every ozone manager's DB. + for (OzoneManager om: ozoneManagers) { + TestOMRequestUtils.addKeyToTable(true, false, + keyInfo, clientID, 0L, om.getMetadataManager()); + + String fullKeyName = om.getMetadataManager().getOpenKey(volume, bucket, + keyInfo.getKeyName(), clientID); + openKeys.add(fullKeyName); + } + } + + return openKeys; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index effe32f83cc5..49c8232f2632 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -56,7 +56,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; @@ -139,7 +138,6 @@ public void init() throws Exception { conf.setBoolean(OZONE_ACL_ENABLED, true); conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS); conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java index a47aa08f2452..d05de97652fd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java @@ -42,7 +42,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED; import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE; import org.junit.Assert; @@ -83,7 +82,6 @@ private MiniOzoneCluster startCluster(boolean aclEnabled, String clusterId = UUID.randomUUID().toString(); String scmId = UUID.randomUUID().toString(); String omId = UUID.randomUUID().toString(); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); // Use native impl here, default impl doesn't do actual checks diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestart.java index bdaca5322335..6216a3e23705 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestart.java @@ -40,9 +40,9 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; import org.junit.AfterClass; import org.junit.Assert; + import static org.junit.Assert.fail; import org.junit.BeforeClass; import org.junit.Rule; @@ -77,7 +77,6 @@ public static void init() throws Exception { scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); cluster = MiniOzoneCluster.newBuilder(conf) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java index 48a9c6a9ef34..8727216fc476 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java @@ -43,14 +43,14 @@ import java.security.PublicKey; import java.security.cert.X509Certificate; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.apache.hadoop.test.GenericTestUtils.*; @@ -82,8 +82,7 @@ public void init() throws Exception { scmId = UUID.randomUUID().toString(); omId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); - conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); - conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); + conf.setTimeDuration(OZONE_OPEN_KEY_EXPIRE_THRESHOLD, 2, TimeUnit.SECONDS); conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2); conf.set(OZONE_SCM_NAMES, "localhost"); diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 824a654fc0ca..0d2ab77aba7f 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.util.TimeDuration; /** * OM metadata manager interface. @@ -253,14 +254,12 @@ List listVolumes(String userName, String prefix, List getPendingDeletionKeys(int count) throws IOException; /** - * Returns the names of up to {@code count} open keys that are older than - * the configured expiration age. - * - * @param count The maximum number of open keys to return. - * @return a list of {@link String} representing names of open expired keys. + * Returns the names of up to {@code count} open keys whose time since + * creation is larger than {@code expireThreshold}. * @throws IOException */ - List getExpiredOpenKeys(int count) throws IOException; + List getExpiredOpenKeys(TimeDuration expireThreshold, int count) + throws IOException; /** * Returns the user Table. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 658f503a1a70..46b77382b564 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.fs.OzoneManagerFS; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.ratis.util.TimeDuration; import java.io.IOException; import java.util.List; @@ -182,25 +183,12 @@ List listTrash(String volumeName, String bucketName, List getPendingDeletionKeys(int count) throws IOException; /** - * Returns the names of up to {@code count} open keys that are older than - * the configured expiration age. - * - * @param count The maximum number of expired open keys to return. - * @return a list of {@link String} representing the names of expired - * open keys. + * Returns the names of up to {@code count} open keys whose time since + * creation is larger than {@code expireThreshold}. * @throws IOException */ - List getExpiredOpenKeys(int count) throws IOException; - - /** - * Deletes a expired open key by its name. Called when a hanging key has been - * lingering for too long. Once called, the open key entries gets removed - * from OM mdata data. - * - * @param objectKeyName object key name with #open# prefix. - * @throws IOException if specified key doesn't exist or other I/O errors. - */ - void deleteExpiredOpenKey(String objectKeyName) throws IOException; + List getExpiredOpenKeys(TimeDuration expireThreshold, int count) + throws IOException; /** * Returns the metadataManager. @@ -214,6 +202,11 @@ List listTrash(String volumeName, String bucketName, */ BackgroundService getDeletingService(); + /** + * Returns the instance of Open Key Deleting Service. + * @return Background service. + */ + BackgroundService getOpenKeyCleanupService(); /** * Initiate multipart upload for the specified key. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 03f639b4e5ea..bbb2ac4ba683 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -128,6 +128,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND; @@ -140,6 +142,8 @@ import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY; import static org.apache.hadoop.util.Time.monotonicNow; + +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,6 +170,7 @@ public class KeyManagerImpl implements KeyManager { private final boolean grpcBlockTokenEnabled; private BackgroundService keyDeletingService; + private BackgroundService openKeyCleanupService; private final KeyProviderCryptoExtension kmsProvider; private final PrefixManager prefixManager; @@ -245,6 +250,19 @@ public void start(OzoneConfiguration configuration) { serviceTimeout, configuration); keyDeletingService.start(); } + + if (openKeyCleanupService == null) { + long serviceInterval = configuration.getTimeDuration( + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL, + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT.getDuration(), + TimeUnit.MILLISECONDS); + + // No timeout duration for open key cleanup. The OM RPC call will time + // out after 15 minutes. + openKeyCleanupService = new OpenKeyCleanupService(ozoneManager, this, + serviceInterval, configuration); + openKeyCleanupService.start(); + } } KeyProviderCryptoExtension getKMSProvider() { @@ -257,6 +275,11 @@ public void stop() throws IOException { keyDeletingService.shutdown(); keyDeletingService = null; } + + if (openKeyCleanupService != null) { + openKeyCleanupService.shutdown(); + openKeyCleanupService = null; + } } private OmBucketInfo getBucketInfo(String volumeName, String bucketName) @@ -946,14 +969,9 @@ public List getPendingDeletionKeys(final int count) } @Override - public List getExpiredOpenKeys(int count) throws IOException { - return metadataManager.getExpiredOpenKeys(count); - } - - @Override - public void deleteExpiredOpenKey(String objectKeyName) throws IOException { - Preconditions.checkNotNull(objectKeyName); - // TODO: Fix this in later patches. + public List getExpiredOpenKeys(TimeDuration expireThreshold, + int count) throws IOException { + return metadataManager.getExpiredOpenKeys(expireThreshold, count); } @Override @@ -966,6 +984,11 @@ public BackgroundService getDeletingService() { return keyDeletingService; } + @Override + public BackgroundService getOpenKeyCleanupService() { + return openKeyCleanupService; + } + @Override public OmMultipartInfo initiateMultipartUpload(OmKeyArgs omKeyArgs) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index a144c7b1cfe1..a445ea59244c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -19,9 +19,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -30,6 +28,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.BlockID; @@ -79,13 +78,12 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.TimeDuration; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,7 +145,6 @@ public class OmMetadataManagerImpl implements OMMetadataManager { private DBStore store; private final OzoneManagerLock lock; - private final long openKeyExpireThresholdMS; private Table userTable; private Table volumeTable; @@ -177,9 +174,6 @@ public class OmMetadataManagerImpl implements OMMetadataManager { public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException { this.lock = new OzoneManagerLock(conf); - this.openKeyExpireThresholdMS = 1000L * conf.getInt( - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); // TODO: This is a temporary check. Once fully implemented, all OM state // change should go through Ratis - be it standalone (for non-HA) or // replicated (for HA). @@ -198,8 +192,6 @@ public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException { */ protected OmMetadataManagerImpl() { this.lock = new OzoneManagerLock(new OzoneConfiguration()); - this.openKeyExpireThresholdMS = - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; this.omEpoch = 0; } @@ -1023,28 +1015,28 @@ public List getPendingDeletionKeys(final int keyCount) } @Override - public List getExpiredOpenKeys(int count) throws IOException { + public List getExpiredOpenKeys(TimeDuration expireThreshold, + int count) throws IOException { // Only check for expired keys in the open key table, not its cache. // If a key expires while it is in the cache, it will be cleaned // up after the cache is flushed. - final Duration expirationDuration = - Duration.of(openKeyExpireThresholdMS, ChronoUnit.MILLIS); List expiredKeys = Lists.newArrayList(); try (TableIterator> - keyValueTableIterator = getOpenKeyTable().iterator()) { + keyValueTableIterator = getOpenKeyTable().iterator()) { while (keyValueTableIterator.hasNext() && expiredKeys.size() < count) { KeyValue openKeyValue = keyValueTableIterator.next(); String openKey = openKeyValue.getKey(); OmKeyInfo openKeyInfo = openKeyValue.getValue(); - Duration openKeyAge = - Duration.between( - Instant.ofEpochMilli(openKeyInfo.getCreationTime()), - Instant.now()); + long openKeyAgeMillis = + Instant.now().toEpochMilli() - openKeyInfo.getCreationTime(); - if (openKeyAge.compareTo(expirationDuration) >= 0) { + TimeDuration openKeyAge = TimeDuration.valueOf(openKeyAgeMillis, + TimeUnit.MILLISECONDS); + + if (openKeyAge.compareTo(expireThreshold) >= 0) { expiredKeys.add(openKey); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index b0f19fcacd29..82940d8cdf6d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -1,69 +1,153 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ - package org.apache.hadoop.ozone.om; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.protobuf.ServiceException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - /** - * This is the background service to delete hanging open keys. - * Scan the metadata of om periodically to get - * the keys with prefix "#open#" and ask scm to - * delete metadata accordingly, if scm returns - * success for keys, then clean up those keys. + * Background service to move keys whose creation time is past a given + * threshold from the open key table to the deleted table, where they will + * later be purged by the {@link KeyDeletingService}. */ public class OpenKeyCleanupService extends BackgroundService { - private static final Logger LOG = LoggerFactory.getLogger(OpenKeyCleanupService.class); - private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2; + // Use only a single thread for open key deletion. Multiple threads would read + // from the same table and can send deletion requests for same key multiple + // times. + private static final int OPEN_KEY_CLEANUP_CORE_POOL_SIZE = 1; + private final OzoneManager ozoneManager; private final KeyManager keyManager; - private final ScmBlockLocationProtocol scmClient; + // Dummy client ID to use for response, since this is triggered by a + // service, not the client. + private final ClientId clientId = ClientId.randomId(); + private final TimeDuration expireThreshold; + private final int cleanupLimitPerTask; + private final AtomicLong submittedOpenKeyCount; + private final AtomicLong runCount; + + OpenKeyCleanupService(OzoneManager ozoneManager, KeyManager keyManager, + long serviceInterval, ConfigurationSource conf) { - public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient, - KeyManager keyManager, int serviceInterval, - long serviceTimeout) { - super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, - OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + super("OpenKeyCleanupService", serviceInterval, TimeUnit.MILLISECONDS, + OPEN_KEY_CLEANUP_CORE_POOL_SIZE); + this.ozoneManager = ozoneManager; this.keyManager = keyManager; - this.scmClient = scmClient; + + long expireDuration = conf.getTimeDuration( + OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD, + OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT.getDuration(), + TimeUnit.MILLISECONDS); + + this.expireThreshold = + TimeDuration.valueOf(expireDuration, TimeUnit.MILLISECONDS); + + this.cleanupLimitPerTask = conf.getInt( + OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_LIMIT_PER_TASK, + OMConfigKeys.OZONE_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT); + + this.submittedOpenKeyCount = new AtomicLong(0); + this.runCount = new AtomicLong(0); + } + + /** + * Returns the number of times this Background service has run. + * + * @return Long, run count. + */ + @VisibleForTesting + public AtomicLong getRunCount() { + return runCount; + } + + /** + * Returns the number of open keys that were submitted for deletion by this + * service. If these keys were committed from the open key table between + * being submitted for deletion and the actual delete operation, they will + * not be deleted. + * + * @return Long count. + */ + @VisibleForTesting + public AtomicLong getSubmittedOpenKeyCount() { + return submittedOpenKeyCount; } @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new OpenKeyDeletingTask()); + queue.add(new OpenKeyCleanupTask()); return queue; } - private class OpenKeyDeletingTask implements BackgroundTask { + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + return ozoneManager.isLeaderReady(); + } + + private boolean isRatisEnabled() { + if (ozoneManager == null) { + return false; + } + return ozoneManager.isRatisEnabled(); + } + private class OpenKeyCleanupTask implements BackgroundTask { @Override public int getPriority() { return 0; @@ -71,18 +155,137 @@ public int getPriority() { @Override public BackgroundTaskResult call() throws Exception { - // This method is currently never used. It will be implemented in - // HDDS-4122, and integrated into the rest of the code base in HDDS-4123. + // Check if this is the Leader OM. If not leader, no need to execute this + // task. + if (shouldRun()) { + runCount.incrementAndGet(); + + try { + long startTime = Time.monotonicNow(); + List expiredOpenKeys = keyManager.getExpiredOpenKeys( + expireThreshold, cleanupLimitPerTask); + + if (expiredOpenKeys != null && !expiredOpenKeys.isEmpty()) { + OMRequest omRequest = buildOpenKeyDeleteRequest(expiredOpenKeys); + if (isRatisEnabled()) { + submitRatisRequest(ozoneManager, omRequest); + } else { + ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); + } + + LOG.debug("Number of expired keys submitted for deletion: {}, " + + "elapsed time: {}ms", + expiredOpenKeys.size(), Time.monotonicNow() - startTime); + submittedOpenKeyCount.addAndGet(expiredOpenKeys.size()); + } + } catch (IOException e) { + LOG.error("Error while running delete keys background task. Will " + + "retry at next run.", e); + } + } + // By design, no one cares about the results of this call back. + return EmptyTaskResult.newResult(); + } + + /** + * Builds a Ratis request to move the keys in {@code expiredOpenKeys} + * out of the open key table and into the delete table. + */ + private OMRequest buildOpenKeyDeleteRequest( + List expiredOpenKeys) { + Map, List> openKeysPerBucket = + new HashMap<>(); + + for (String keyName: expiredOpenKeys) { + // Separate volume, bucket, key name, and client ID, and add to the + // bucket grouping map. + addToMap(openKeysPerBucket, keyName); + LOG.debug("Open Key {} has been marked as expired and is being " + + "submitted for deletion", keyName); + } + + DeleteOpenKeysRequest.Builder requestBuilder = + DeleteOpenKeysRequest.newBuilder(); + + // Add keys to open key delete request by bucket. + for (Map.Entry, List> entry: + openKeysPerBucket.entrySet()) { + + Pair volumeBucketPair = entry.getKey(); + OpenKeyBucket openKeyBucket = OpenKeyBucket.newBuilder() + .setVolumeName(volumeBucketPair.getLeft()) + .setBucketName(volumeBucketPair.getRight()) + .addAllKeys(entry.getValue()) + .build(); + requestBuilder.addOpenKeysPerBucket(openKeyBucket); + } + + return OMRequest.newBuilder() + .setCmdType(Type.DeleteOpenKeys) + .setDeleteOpenKeysRequest(requestBuilder) + .setClientId(clientId.toString()) + .build(); + } + + private void submitRatisRequest(OzoneManager om, OMRequest omRequest) { + try { + OzoneManagerRatisServer server = om.getOmRatisServer(); + RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() + .setClientId(ClientId.randomId()) + .setServerId(server.getRaftPeerId()) + .setGroupId(server.getRaftGroupId()) + .setCallId(0) + .setMessage( + Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); + + server.submitRequest(omRequest, raftClientRequest); + } catch (ServiceException ex) { + LOG.error("Open key delete request failed. Will retry at next run.", + ex); + } + } + + /** + * Separates {@code openKeyName} into its volume, bucket, key, and client + * ID. Creates an {@link OpenKey} object from {@code openKeyName}'s key and + * client ID, and maps {@code openKeyName}'s volume and bucket to this + * {@link OpenKey}. + */ + private void addToMap(Map, List> + openKeysPerBucket, String openKeyName) { + // First element of the split is an empty string since the key begins + // with the separator. + // Key may contain multiple instances of the separator as well, + // for example: /volume/bucket/dir1//dir2/dir3/file1////10000 + String[] split = openKeyName.split(OM_KEY_PREFIX); + Preconditions.assertTrue(split.length >= 5, + "Unable to separate volume, bucket, key, and client ID from" + + " open key {}.", openKeyName); + + Pair volumeBucketPair = Pair.of(split[1], split[2]); + String key = String.join(OM_KEY_PREFIX, + Arrays.copyOfRange(split, 3, split.length - 1)); + String clientID = split[split.length - 1]; + + if (!openKeysPerBucket.containsKey(volumeBucketPair)) { + openKeysPerBucket.put(volumeBucketPair, new ArrayList<>()); + } + try { - // The new API for deleting expired open keys in OM HA will differ - // significantly from the old implementation. - // The old implementation has been removed so the code compiles. - keyManager.getExpiredOpenKeys(0); - } catch (IOException e) { - LOG.error("Unable to get hanging open keys, retry in" - + " next interval", e); + OpenKey openKey = OpenKey.newBuilder() + .setName(key) + .setClientID(Long.parseLong(clientID)) + .build(); + openKeysPerBucket.get(volumeBucketPair).add(openKey); + } catch (NumberFormatException ex) { + // If the client ID cannot be parsed correctly, do not add the key to + // the map. + LOG.error("Failed to parse client ID {} as a long from open key {}.", + clientID, openKeyName, ex); } - return BackgroundTaskResult.EmptyTaskResult.newResult(); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 681c0da87e6d..e205e01d3457 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -64,6 +64,7 @@ import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeAddAclRequest; import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeRemoveAclRequest; import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeSetAclRequest; +import org.apache.hadoop.ozone.om.response.key.OMOpenKeysDeleteRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType; @@ -160,6 +161,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) { return new S3GetSecretRequest(omRequest); case RecoverTrash: return new OMTrashRecoverRequest(omRequest); + case DeleteOpenKeys: + return new OMOpenKeysDeleteRequest(omRequest); default: throw new IllegalStateException("Unrecognized write command " + "type request" + cmdType); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java index 5cc048b12a12..fba14d2edeb4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo; import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -39,9 +40,8 @@ import java.util.TreeSet; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; @@ -547,12 +547,22 @@ public void testGetExpiredOpenKeys() throws Exception { final long clientID = 1000L; // To create expired keys, they will be assigned a creation time twice as // old as the minimum expiration time. - final long minExpiredTimeSeconds = ozoneConfiguration.getInt( - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); + TimeUnit expireUnit = + OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT.getUnit(); + + long expireThreshold = ozoneConfiguration.getTimeDuration( + OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD, + OMConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT.getDuration(), + expireUnit); + + TimeDuration expireDuration = TimeDuration.valueOf(expireThreshold, + expireUnit); + + long expireThresholdMillis = expireDuration.toLong(TimeUnit.MILLISECONDS); + final long expiredAgeMillis = - Instant.now().minus(minExpiredTimeSeconds * 2, - ChronoUnit.SECONDS).toEpochMilli(); + Instant.now().minus(expireThresholdMillis * 2, + ChronoUnit.MILLIS).toEpochMilli(); // Add expired keys to open key table. // The method under test does not check for expired open keys in the @@ -583,7 +593,8 @@ public void testGetExpiredOpenKeys() throws Exception { // Test retrieving fewer expired keys than actually exist. List someExpiredKeys = - omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys - 1); + omMetadataManager.getExpiredOpenKeys(expireDuration, + numExpiredOpenKeys - 1); Assert.assertEquals(numExpiredOpenKeys - 1, someExpiredKeys.size()); for (String key: someExpiredKeys) { @@ -592,7 +603,8 @@ public void testGetExpiredOpenKeys() throws Exception { // Test attempting to retrieving more expired keys than actually exist. List allExpiredKeys = - omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys + 1); + omMetadataManager.getExpiredOpenKeys(expireDuration, + numExpiredOpenKeys + 1); Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size()); for (String key: allExpiredKeys) { @@ -601,7 +613,8 @@ public void testGetExpiredOpenKeys() throws Exception { // Test retrieving exact amount of expired keys that exist. allExpiredKeys = - omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys); + omMetadataManager.getExpiredOpenKeys(expireDuration, + numExpiredOpenKeys); Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size()); for (String key: allExpiredKeys) {