diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1bad21582f5e..d91d8519a967 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1031,7 +1031,7 @@ message OpenKeyBucket { message OpenKey { required string name = 1; - required uint64 clientID = 2; + optional uint64 clientID = 2 [deprecated=true]; } message OMTokenProto { diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index aee7d78957b0..9064968db327 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.lock.OzoneManagerLock; import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; import org.apache.hadoop.ozone.storage.proto. OzoneManagerStorageProtos.PersistedUserVolumeInfo; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; @@ -262,11 +263,12 @@ List listVolumes(String userName, String prefix, * * @param count The maximum number of open keys to return. * @param expireThreshold The threshold of open key expire age. - * @return a list of {@link String} representing names of open expired keys. + * @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO). + * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys. * @throws IOException */ - List getExpiredOpenKeys(Duration expireThreshold, int count) - throws IOException; + List getExpiredOpenKeys(Duration expireThreshold, int count, + BucketLayout bucketLayout) throws IOException; /** * Returns the user Table. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index f86eaf4758c9..9e5ae1cc494d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; @@ -26,6 +27,7 @@ import org.apache.hadoop.ozone.om.fs.OzoneManagerFS; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; import java.io.IOException; import java.time.Duration; @@ -123,12 +125,12 @@ List listTrash(String volumeName, String bucketName, * * @param count The maximum number of expired open keys to return. * @param expireThreshold The threshold of open key expiration age. - * @return a list of {@link String} representing the names of expired - * open keys. + * @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO). + * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys. * @throws IOException */ - List getExpiredOpenKeys(Duration expireThreshold, int count) - throws IOException; + List getExpiredOpenKeys(Duration expireThreshold, int count, + BucketLayout bucketLayout) throws IOException; /** * Returns the metadataManager. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index cd04acbef076..8411e3e4c7f3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -86,6 +86,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; @@ -582,9 +583,10 @@ public List getPendingDeletionKeys(final int count) } @Override - public List getExpiredOpenKeys(Duration expireThreshold, - int count) throws IOException { - return metadataManager.getExpiredOpenKeys(expireThreshold, count); + public List getExpiredOpenKeys(Duration expireThreshold, + int count, BucketLayout bucketLayout) throws IOException { + return metadataManager.getExpiredOpenKeys(expireThreshold, count, + bucketLayout); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index b609b26f0de9..b25f3f82c4d3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -73,6 +73,8 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.lock.OzoneManagerLock; import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; import org.apache.hadoop.ozone.storage.proto .OzoneManagerStorageProtos.PersistedUserVolumeInfo; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; @@ -1193,33 +1195,46 @@ public List getPendingDeletionKeys(final int keyCount) } @Override - public List getExpiredOpenKeys(Duration expireThreshold, - int count) throws IOException { + public List getExpiredOpenKeys(Duration expireThreshold, + int count, BucketLayout bucketLayout) throws IOException { + Map expiredKeys = new HashMap<>(); + // Only check for expired keys in the open key table, not its cache. // If a key expires while it is in the cache, it will be cleaned // up after the cache is flushed. - List expiredKeys = Lists.newArrayList(); - try (TableIterator> - keyValueTableIterator = getOpenKeyTable(getBucketLayout()).iterator()) { + keyValueTableIterator = getOpenKeyTable(bucketLayout).iterator()) { + + final long expiredCreationTimestamp = + Instant.now().minus(expireThreshold).toEpochMilli(); - final long queryTime = Instant.now().toEpochMilli(); + OpenKey.Builder builder = OpenKey.newBuilder(); - while (keyValueTableIterator.hasNext() && expiredKeys.size() < count) { + int num = 0; + while (num < count && keyValueTableIterator.hasNext()) { KeyValue openKeyValue = keyValueTableIterator.next(); - String openKey = openKeyValue.getKey(); + String dbOpenKeyName = openKeyValue.getKey(); OmKeyInfo openKeyInfo = openKeyValue.getValue(); - final long openKeyAgeMillis = queryTime - openKeyInfo.getCreationTime(); - final Duration openKeyAge = Duration.ofMillis(openKeyAgeMillis); - - if (openKeyAge.compareTo(expireThreshold) >= 0) { - expiredKeys.add(openKey); + if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) { + final String volume = openKeyInfo.getVolumeName(); + final String bucket = openKeyInfo.getBucketName(); + final String mapKey = volume + OM_KEY_PREFIX + bucket; + if (!expiredKeys.containsKey(mapKey)) { + expiredKeys.put(mapKey, + OpenKeyBucket.newBuilder() + .setVolumeName(volume) + .setBucketName(bucket)); + } + expiredKeys.get(mapKey) + .addKeys(builder.setName(dbOpenKeyName).build()); + num++; } } } - return expiredKeys; + return expiredKeys.values().stream().map(OpenKeyBucket.Builder::build) + .collect(Collectors.toList()); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java index 05dec6c3b882..1a4e17c96fdb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +79,7 @@ public BackgroundTaskResult call() throws Exception { // The new API for deleting expired open keys in OM HA will differ // significantly from the old implementation. // The old implementation has been removed so the code compiles. - keyManager.getExpiredOpenKeys(Duration.ZERO, 0); + keyManager.getExpiredOpenKeys(Duration.ZERO, 0, BucketLayout.DEFAULT); } catch (IOException e) { LOG.error("Unable to get hanging open keys, retry in" + " next interval", e); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java index f267a093bde7..aa8e98c91d21 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java @@ -27,7 +27,10 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -41,8 +44,9 @@ import java.util.Set; import java.util.TreeSet; import java.time.Instant; -import java.time.temporal.ChronoUnit; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT; @@ -546,12 +550,22 @@ private static BucketLayout getDefaultBucketLayout() { @Test public void testGetExpiredOpenKeys() throws Exception { - final String bucketName = "bucket"; - final String volumeName = "volume"; + testGetExpiredOpenKeys(BucketLayout.DEFAULT); + } + + @Test + public void testGetExpiredOpenKeysFSO() throws Exception { + testGetExpiredOpenKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED); + } + + private void testGetExpiredOpenKeys(BucketLayout bucketLayout) + throws Exception { + final String bucketName = UUID.randomUUID().toString(); + final String volumeName = UUID.randomUUID().toString(); final int numExpiredOpenKeys = 4; final int numUnexpiredOpenKeys = 1; final long clientID = 1000L; - // To create expired keys, they will be assigned a creation time twice as + // To create expired keys, they will be assigned a creation time as // old as the minimum expiration time. final long expireThresholdMillis = ozoneConfiguration.getTimeDuration( OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD, @@ -560,65 +574,68 @@ public void testGetExpiredOpenKeys() throws Exception { final Duration expireThreshold = Duration.ofMillis(expireThresholdMillis); - final long expiredOpenKeyCreationTime = Instant.now() - .minus(expireThresholdMillis * 2, ChronoUnit.MILLIS).toEpochMilli(); + final long expiredOpenKeyCreationTime = + Instant.now().minus(expireThreshold).toEpochMilli(); // Add expired keys to open key table. // The method under test does not check for expired open keys in the // cache, since they will be picked up once the cache is flushed. Set expiredKeys = new HashSet<>(); - for (int i = 0; i < numExpiredOpenKeys; i++) { - OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName, + for (int i = 0; i < numExpiredOpenKeys + numUnexpiredOpenKeys; i++) { + final long creationTime = i < numExpiredOpenKeys ? + expiredOpenKeyCreationTime : Instant.now().toEpochMilli(); + final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, "expired" + i, HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.ONE, 0L, expiredOpenKeyCreationTime); - - OMRequestTestUtils.addKeyToTable(true, false, - keyInfo, clientID, 0L, omMetadataManager); - - String groupID = omMetadataManager.getOpenKey(volumeName, bucketName, - keyInfo.getKeyName(), clientID); - expiredKeys.add(groupID); - } - - // Add unexpired keys to open key table. - for (int i = 0; i < numUnexpiredOpenKeys; i++) { - OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName, - bucketName, "unexpired" + i, HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.ONE); - - OMRequestTestUtils.addKeyToTable(true, false, - keyInfo, clientID, 0L, omMetadataManager); + HddsProtos.ReplicationFactor.ONE, 0L, creationTime); + + final String dbOpenKeyName; + if (bucketLayout.isFileSystemOptimized()) { + keyInfo.setParentObjectID(i); + keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName())); + OMRequestTestUtils.addFileToKeyTable(true, false, + keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager); + dbOpenKeyName = omMetadataManager.getOpenFileName( + keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID); + } else { + OMRequestTestUtils.addKeyToTable(true, false, + keyInfo, clientID, 0L, omMetadataManager); + dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName, + keyInfo.getKeyName(), clientID); + } + expiredKeys.add(dbOpenKeyName); } // Test retrieving fewer expired keys than actually exist. - List someExpiredKeys = + List someExpiredKeys = omMetadataManager.getExpiredOpenKeys(expireThreshold, - numExpiredOpenKeys - 1); - - Assert.assertEquals(numExpiredOpenKeys - 1, someExpiredKeys.size()); - for (String key: someExpiredKeys) { - Assert.assertTrue(expiredKeys.contains(key)); - } + numExpiredOpenKeys - 1, bucketLayout); + List names = getOpenKeyNames(someExpiredKeys); + Assert.assertEquals(numExpiredOpenKeys - 1, names.size()); + Assert.assertTrue(expiredKeys.containsAll(names)); // Test attempting to retrieving more expired keys than actually exist. - List allExpiredKeys = + List allExpiredKeys = omMetadataManager.getExpiredOpenKeys(expireThreshold, - numExpiredOpenKeys + 1); - - Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size()); - for (String key: allExpiredKeys) { - Assert.assertTrue(expiredKeys.contains(key)); - } + numExpiredOpenKeys + 1, bucketLayout); + names = getOpenKeyNames(allExpiredKeys); + Assert.assertEquals(numExpiredOpenKeys, names.size()); + Assert.assertTrue(expiredKeys.containsAll(names)); // Test retrieving exact amount of expired keys that exist. allExpiredKeys = omMetadataManager.getExpiredOpenKeys(expireThreshold, - numExpiredOpenKeys); + numExpiredOpenKeys, bucketLayout); + names = getOpenKeyNames(allExpiredKeys); + Assert.assertEquals(numExpiredOpenKeys, names.size()); + Assert.assertTrue(expiredKeys.containsAll(names)); + } - Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size()); - for (String key: allExpiredKeys) { - Assert.assertTrue(expiredKeys.contains(key)); - } + private List getOpenKeyNames(List openKeyBuckets) { + return openKeyBuckets.stream() + .map(OpenKeyBucket::getKeysList) + .flatMap(List::stream) + .map(OpenKey::getName) + .collect(Collectors.toList()); } private void addKeysToOM(String volumeName, String bucketName,