Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ message OpenKeyBucket {

message OpenKey {
required string name = 1;
required uint64 clientID = 2;
optional uint64 clientID = 2 [deprecated=true];
}

message OMTokenProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
Expand Down Expand Up @@ -262,11 +263,12 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
*
* @param count The maximum number of open keys to return.
* @param expireThreshold The threshold of open key expire age.
* @return a list of {@link String} representing names of open expired keys.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
* @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
* @throws IOException
*/
List<String> getExpiredOpenKeys(Duration expireThreshold, int count)
throws IOException;
List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;

/**
* Returns the user Table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -123,12 +125,12 @@ List<RepeatedOmKeyInfo> listTrash(String volumeName, String bucketName,
*
* @param count The maximum number of expired open keys to return.
* @param expireThreshold The threshold of open key expiration age.
* @return a list of {@link String} representing the names of expired
* open keys.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
* @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
* @throws IOException
*/
List<String> getExpiredOpenKeys(Duration expireThreshold, int count)
throws IOException;
List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;

/**
* Returns the metadataManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
Expand Down Expand Up @@ -582,9 +583,10 @@ public List<BlockGroup> getPendingDeletionKeys(final int count)
}

@Override
public List<String> getExpiredOpenKeys(Duration expireThreshold,
int count) throws IOException {
return metadataManager.getExpiredOpenKeys(expireThreshold, count);
public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
return metadataManager.getExpiredOpenKeys(expireThreshold, count,
bucketLayout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.apache.hadoop.ozone.storage.proto
.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
Expand Down Expand Up @@ -1193,33 +1195,46 @@ public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
}

@Override
public List<String> getExpiredOpenKeys(Duration expireThreshold,
int count) throws IOException {
public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
Map<String, OpenKeyBucket.Builder> expiredKeys = new HashMap<>();

// Only check for expired keys in the open key table, not its cache.
// If a key expires while it is in the cache, it will be cleaned
// up after the cache is flushed.
List<String> expiredKeys = Lists.newArrayList();

try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyValueTableIterator = getOpenKeyTable(getBucketLayout()).iterator()) {
keyValueTableIterator = getOpenKeyTable(bucketLayout).iterator()) {

final long expiredCreationTimestamp =
Instant.now().minus(expireThreshold).toEpochMilli();

final long queryTime = Instant.now().toEpochMilli();
OpenKey.Builder builder = OpenKey.newBuilder();

while (keyValueTableIterator.hasNext() && expiredKeys.size() < count) {
int num = 0;
while (num < count && keyValueTableIterator.hasNext()) {
KeyValue<String, OmKeyInfo> openKeyValue = keyValueTableIterator.next();
String openKey = openKeyValue.getKey();
String dbOpenKeyName = openKeyValue.getKey();
OmKeyInfo openKeyInfo = openKeyValue.getValue();

final long openKeyAgeMillis = queryTime - openKeyInfo.getCreationTime();
final Duration openKeyAge = Duration.ofMillis(openKeyAgeMillis);

if (openKeyAge.compareTo(expireThreshold) >= 0) {
expiredKeys.add(openKey);
if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
final String volume = openKeyInfo.getVolumeName();
final String bucket = openKeyInfo.getBucketName();
final String mapKey = volume + OM_KEY_PREFIX + bucket;
if (!expiredKeys.containsKey(mapKey)) {
expiredKeys.put(mapKey,
OpenKeyBucket.newBuilder()
.setVolumeName(volume)
.setBucketName(bucket));
}
expiredKeys.get(mapKey)
.addKeys(builder.setName(dbOpenKeyName).build());
num++;
}
}
}

return expiredKeys;
return expiredKeys.values().stream().map(OpenKeyBucket.Builder::build)
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,7 +79,7 @@ public BackgroundTaskResult call() throws Exception {
// The new API for deleting expired open keys in OM HA will differ
// significantly from the old implementation.
// The old implementation has been removed so the code compiles.
keyManager.getExpiredOpenKeys(Duration.ZERO, 0);
keyManager.getExpiredOpenKeys(Duration.ZERO, 0, BucketLayout.DEFAULT);
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in"
+ " next interval", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -41,8 +44,9 @@
import java.util.Set;
import java.util.TreeSet;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT;
Expand Down Expand Up @@ -546,12 +550,22 @@ private static BucketLayout getDefaultBucketLayout() {

@Test
public void testGetExpiredOpenKeys() throws Exception {
final String bucketName = "bucket";
final String volumeName = "volume";
testGetExpiredOpenKeys(BucketLayout.DEFAULT);
}

@Test
public void testGetExpiredOpenKeysFSO() throws Exception {
testGetExpiredOpenKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
}

private void testGetExpiredOpenKeys(BucketLayout bucketLayout)
throws Exception {
final String bucketName = UUID.randomUUID().toString();
final String volumeName = UUID.randomUUID().toString();
final int numExpiredOpenKeys = 4;
final int numUnexpiredOpenKeys = 1;
final long clientID = 1000L;
// To create expired keys, they will be assigned a creation time twice as
// To create expired keys, they will be assigned a creation time as
// old as the minimum expiration time.
final long expireThresholdMillis = ozoneConfiguration.getTimeDuration(
OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
Expand All @@ -560,65 +574,68 @@ public void testGetExpiredOpenKeys() throws Exception {

final Duration expireThreshold = Duration.ofMillis(expireThresholdMillis);

final long expiredOpenKeyCreationTime = Instant.now()
.minus(expireThresholdMillis * 2, ChronoUnit.MILLIS).toEpochMilli();
final long expiredOpenKeyCreationTime =
Instant.now().minus(expireThreshold).toEpochMilli();

// Add expired keys to open key table.
// The method under test does not check for expired open keys in the
// cache, since they will be picked up once the cache is flushed.
Set<String> expiredKeys = new HashSet<>();
for (int i = 0; i < numExpiredOpenKeys; i++) {
OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
for (int i = 0; i < numExpiredOpenKeys + numUnexpiredOpenKeys; i++) {
final long creationTime = i < numExpiredOpenKeys ?
expiredOpenKeyCreationTime : Instant.now().toEpochMilli();
final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, "expired" + i, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, 0L, expiredOpenKeyCreationTime);

OMRequestTestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);

String groupID = omMetadataManager.getOpenKey(volumeName, bucketName,
keyInfo.getKeyName(), clientID);
expiredKeys.add(groupID);
}

// Add unexpired keys to open key table.
for (int i = 0; i < numUnexpiredOpenKeys; i++) {
OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, "unexpired" + i, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);

OMRequestTestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);
HddsProtos.ReplicationFactor.ONE, 0L, creationTime);

final String dbOpenKeyName;
if (bucketLayout.isFileSystemOptimized()) {
keyInfo.setParentObjectID(i);
keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
OMRequestTestUtils.addFileToKeyTable(true, false,
keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager);
dbOpenKeyName = omMetadataManager.getOpenFileName(
keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID);
} else {
OMRequestTestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);
dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
keyInfo.getKeyName(), clientID);
}
expiredKeys.add(dbOpenKeyName);
}

// Test retrieving fewer expired keys than actually exist.
List<String> someExpiredKeys =
List<OpenKeyBucket> someExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys - 1);

Assert.assertEquals(numExpiredOpenKeys - 1, someExpiredKeys.size());
for (String key: someExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}
numExpiredOpenKeys - 1, bucketLayout);
List<String> names = getOpenKeyNames(someExpiredKeys);
Assert.assertEquals(numExpiredOpenKeys - 1, names.size());
Assert.assertTrue(expiredKeys.containsAll(names));

// Test attempting to retrieving more expired keys than actually exist.
List<String> allExpiredKeys =
List<OpenKeyBucket> allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys + 1);

Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size());
for (String key: allExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}
numExpiredOpenKeys + 1, bucketLayout);
names = getOpenKeyNames(allExpiredKeys);
Assert.assertEquals(numExpiredOpenKeys, names.size());
Assert.assertTrue(expiredKeys.containsAll(names));

// Test retrieving exact amount of expired keys that exist.
allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys);
numExpiredOpenKeys, bucketLayout);
names = getOpenKeyNames(allExpiredKeys);
Assert.assertEquals(numExpiredOpenKeys, names.size());
Assert.assertTrue(expiredKeys.containsAll(names));
}

Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size());
for (String key: allExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}
private List<String> getOpenKeyNames(List<OpenKeyBucket> openKeyBuckets) {
return openKeyBuckets.stream()
.map(OpenKeyBucket::getKeysList)
.flatMap(List::stream)
.map(OpenKey::getName)
.collect(Collectors.toList());
}

private void addKeysToOM(String volumeName, String bucketName,
Expand Down