Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,15 @@ List<RepeatedOmKeyInfo> listTrash(String volumeName, String bucketName,
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;

/**
* Returns a list of all still open key info. Which contains the info about
* the key name and all its associated block IDs. A pending open key has
* prefix #open# in OM DB.
* Returns the names of up to {@code count} open keys that are older than
* the configured expiration age.
*
* @return a list of {@link BlockGroup} representing keys and blocks.
* @param count The maximum number of expired open keys to return.
* @return a list of {@link String} representing the names of expired
* open keys.
* @throws IOException
*/
List<BlockGroup> getExpiredOpenKeys() throws IOException;
List<String> getExpiredOpenKeys(int count) throws IOException;

/**
* Deletes a expired open key by its name. Called when a hanging key has been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,9 +940,8 @@ public List<BlockGroup> getPendingDeletionKeys(final int count)
}

@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
return metadataManager.getExpiredOpenKeys();

public List<String> getExpiredOpenKeys(int count) throws IOException {
return metadataManager.getExpiredOpenKeys(count);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;

/**
* Returns a list of all still open key info. Which contains the info about
* the key name and all its associated block IDs. A pending open key has
* prefix #open# in OM DB.
* Returns the names of up to {@code count} open keys that are older than
* the configured expiration age.
*
* @return a list of {@link BlockGroup} representing keys and blocks.
* @param count The maximum number of open keys to return.
* @return a list of {@link String} representing names of open expired keys.
* @throws IOException
*/
List<BlockGroup> getExpiredOpenKeys() throws IOException;
List<String> getExpiredOpenKeys(int count) throws IOException;

/**
* Returns the user Table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -992,10 +995,34 @@ public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
}

@Override
public List<BlockGroup> getExpiredOpenKeys() throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
// TODO: Fix the getExpiredOpenKeys, Not part of this patch.
return keyBlocksList;
public List<String> getExpiredOpenKeys(int count) throws IOException {
// Only check for expired keys in the open key table, not its cache.
// If a key expires while it is in the cache, it will be cleaned
// up after the cache is flushed.
final Duration expirationDuration =
Duration.of(openKeyExpireThresholdMS, ChronoUnit.MILLIS);
List<String> expiredKeys = Lists.newArrayList();

try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
keyValueTableIterator = getOpenKeyTable().iterator()) {

while (keyValueTableIterator.hasNext() && expiredKeys.size() < count) {
KeyValue<String, OmKeyInfo> openKeyValue = keyValueTableIterator.next();
String openKey = openKeyValue.getKey();
OmKeyInfo openKeyInfo = openKeyValue.getValue();

Duration openKeyAge =
Duration.between(
Instant.ofEpochMilli(openKeyInfo.getCreationTime()),
Instant.now());

if (openKeyAge.compareTo(expirationDuration) >= 0) {
expiredKeys.add(openKey);
}
}
}

return expiredKeys;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.hadoop.ozone.om;

import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
Expand All @@ -30,7 +27,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -76,39 +72,13 @@ public int getPriority() {

@Override
public BackgroundTaskResult call() throws Exception {
// This method is currently never used. It will be implemented in
// HDDS-4122, and integrated into the rest of the code base in HDDS-4123.
try {
List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
if (keyBlocksList.size() > 0) {
int toDeleteSize = keyBlocksList.size();
LOG.debug("Found {} to-delete open keys in OM", toDeleteSize);
List<DeleteBlockGroupResult> results =
scmClient.deleteKeyBlocks(keyBlocksList);
int deletedSize = 0;
for (DeleteBlockGroupResult result : results) {
if (result.isSuccess()) {
try {
keyManager.deleteExpiredOpenKey(result.getObjectKey());
if (LOG.isDebugEnabled()) {
LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
}
deletedSize += 1;
} catch (IOException e) {
LOG.warn("Failed to delete hanging-open key {}",
result.getObjectKey(), e);
}
} else {
LOG.warn("Deleting open Key {} failed because some of the blocks"
+ " were failed to delete, failed blocks: {}",
result.getObjectKey(),
StringUtils.join(",", result.getFailedBlocks()));
}
}
LOG.info("Found {} expired open key entries, successfully " +
"cleaned up {} entries", toDeleteSize, deletedSize);
return results::size;
} else {
LOG.debug("No hanging open key found in OM");
}
// The new API for deleting expired open keys in OM HA will differ
// significantly from the old implementation.
// The old implementation has been removed so the code compiles.
keyManager.getExpiredOpenKeys(0);
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in"
+ " next interval", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;

Expand Down Expand Up @@ -521,6 +527,77 @@ public void testListKeysWithFewDeleteEntriesInCache() throws Exception {

}

@Test
public void testGetExpiredOpenKeys() throws Exception {
final String bucketName = "bucket";
final String volumeName = "volume";
final int numExpiredOpenKeys = 4;
final int numUnexpiredOpenKeys = 1;
final long clientID = 1000L;
// To create expired keys, they will be assigned a creation time twice as
// old as the minimum expiration time.
final long minExpiredTimeSeconds = ozoneConfiguration.getInt(
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
final long expiredAgeMillis =
Instant.now().minus(minExpiredTimeSeconds * 2,
ChronoUnit.SECONDS).toEpochMilli();

// Add expired keys to open key table.
// The method under test does not check for expired open keys in the
// cache, since they will be picked up once the cache is flushed.
Set<String> expiredKeys = new HashSet<>();
for (int i = 0; i < numExpiredOpenKeys; i++) {
OmKeyInfo keyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
bucketName, "expired" + i, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, 0L, expiredAgeMillis);

TestOMRequestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);

String groupID = omMetadataManager.getOpenKey(volumeName, bucketName,
keyInfo.getKeyName(), clientID);
expiredKeys.add(groupID);
}

// Add unexpired keys to open key table.
for (int i = 0; i < numUnexpiredOpenKeys; i++) {
OmKeyInfo keyInfo = TestOMRequestUtils.createOmKeyInfo(volumeName,
bucketName, "unexpired" + i, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);

TestOMRequestUtils.addKeyToTable(true, false,
keyInfo, clientID, 0L, omMetadataManager);
}

// Test retrieving fewer expired keys than actually exist.
List<String> someExpiredKeys =
omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys - 1);

Assert.assertEquals(numExpiredOpenKeys - 1, someExpiredKeys.size());
for (String key: someExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}

// Test attempting to retrieving more expired keys than actually exist.
List<String> allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys + 1);

Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size());
for (String key: allExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}

// Test retrieving exact amount of expired keys that exist.
allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(numExpiredOpenKeys);

Assert.assertEquals(numExpiredOpenKeys, allExpiredKeys.size());
for (String key: allExpiredKeys) {
Assert.assertTrue(expiredKeys.contains(key));
}
}

private void addKeysToOM(String volumeName, String bucketName,
String keyName, int i) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ public static void addKeyToTable(boolean openKeyTable, boolean addToCache,
OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
replicationType, replicationFactor, trxnLogIndex);

addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex,
omMetadataManager);

}

/**
* Add key entry to KeyTable. if openKeyTable flag is true, add's entries
* to openKeyTable, else add's it to keyTable.
* @throws Exception
*/
public static void addKeyToTable(boolean openKeyTable, boolean addToCache,
OmKeyInfo omKeyInfo, long clientID,
long trxnLogIndex,
OMMetadataManager omMetadataManager)
throws Exception {

String volumeName = omKeyInfo.getVolumeName();
String bucketName = omKeyInfo.getBucketName();
String keyName = omKeyInfo.getKeyName();

if (openKeyTable) {
String ozoneKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, clientID);
Expand Down Expand Up @@ -213,13 +233,24 @@ public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
String keyName, HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor, long objectID) {
return createOmKeyInfo(volumeName, bucketName, keyName, replicationType,
replicationFactor, objectID, Time.now());
}

/**
* Create OmKeyInfo.
*/
public static OmKeyInfo createOmKeyInfo(String volumeName, String bucketName,
String keyName, HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor, long objectID,
long creationTime) {
return new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>())))
.setCreationTime(Time.now())
.setCreationTime(creationTime)
.setModificationTime(Time.now())
.setDataSize(1000L)
.setReplicationType(replicationType)
Expand Down