diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 66b85aa7499..048af241a33 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1439,6 +1439,17 @@
+
+ ozone.om.lease.hard.limit
+ 7d
+ OZONE, OM, PERFORMANCE
+
+ Controls how long an open hsync key is considered as active. Specifically, if a hsync key
+ has been open longer than the value of this config entry, that open hsync key is considered as
+ expired (e.g. due to client crash). Unit could be defined with postfix (ns,ms,s,m,h,d)
+
+
+
ozone.om.open.key.cleanup.limit.per.task
1000
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 5dd7579eb91..ec001587de5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -141,6 +141,11 @@ private OMConfigKeys() {
public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT =
"7d";
+ public static final String OZONE_OM_LEASE_HARD_LIMIT =
+ "ozone.om.lease.hard.limit";
+ public static final String OZONE_OM_LEASE_HARD_LIMIT_DEFAULT =
+ "7d";
+
public static final String OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK =
"ozone.om.open.key.cleanup.limit.per.task";
public static final int OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT =
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 00bf5752053..bf61c037db8 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -335,7 +335,7 @@ List listVolumes(String userName, String prefix,
* @throws IOException
*/
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
- BucketLayout bucketLayout) throws IOException;
+ BucketLayout bucketLayout, Duration leaseThreshold) throws IOException;
/**
* Returns the names of up to {@code count} MPU key whose age is greater
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 4378701426c..7a3312c0685 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -145,11 +145,12 @@ List listTrash(String volumeName, String bucketName,
* @param count The maximum number of expired open keys to return.
* @param expireThreshold The threshold of open key expiration age.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
+ * @param leaseThreshold The threshold of hsync key.
* @return the expired open keys.
* @throws IOException
*/
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
- BucketLayout bucketLayout) throws IOException;
+ BucketLayout bucketLayout, Duration leaseThreshold) throws IOException;
/**
* Returns the MPU infos of up to {@code count} whose age is greater
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 37b1c129af4..407f26b7a09 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -659,9 +659,9 @@ public PendingKeysDeletion getPendingDeletionKeys(final int count)
@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
- int count, BucketLayout bucketLayout) throws IOException {
+ int count, BucketLayout bucketLayout, Duration leaseThreshold) throws IOException {
return metadataManager.getExpiredOpenKeys(expireThreshold, count,
- bucketLayout);
+ bucketLayout, leaseThreshold);
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 1969bce918f..b340ce08a8f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1844,7 +1844,7 @@ public long getTotalOpenKeyCount() throws IOException {
@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
- int count, BucketLayout bucketLayout) throws IOException {
+ int count, BucketLayout bucketLayout, Duration leaseThreshold) throws IOException {
final ExpiredOpenKeys expiredKeys = new ExpiredOpenKeys();
final Table kt = getKeyTable(bucketLayout);
@@ -1857,6 +1857,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
final long expiredCreationTimestamp =
expireThreshold.negated().plusMillis(Time.now()).toMillis();
+ final long expiredLeaseTimestamp =
+ leaseThreshold.negated().plusMillis(Time.now()).toMillis();
int num = 0;
while (num < count && keyValueTableIterator.hasNext()) {
@@ -1871,7 +1873,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
continue;
}
- if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
+ if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp ||
+ openKeyInfo.getModificationTime() <= expiredLeaseTimestamp) {
final String clientIdString
= dbOpenKeyName.substring(lastPrefix + 1);
@@ -1882,10 +1885,12 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
.filter(id -> id.equals(clientIdString))
.isPresent();
- if (!isHsync) {
+ if (!isHsync && openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
// add non-hsync'ed keys
expiredKeys.addOpenKey(openKeyInfo, dbOpenKeyName);
- } else {
+ num++;
+ } else if (isHsync && openKeyInfo.getModificationTime() <= expiredLeaseTimestamp &&
+ !openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
// add hsync'ed keys
final KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(info.getVolumeName())
@@ -1903,8 +1908,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
info.getReplicationConfig(), keyArgs);
expiredKeys.addHsyncKey(keyArgs, Long.parseLong(clientIdString));
+ num++;
}
- num++;
}
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
index 0f2c1f84a6c..ab556230194 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -76,6 +77,7 @@ public class OpenKeyCleanupService extends BackgroundService {
// service, not the client.
private final ClientId clientId = ClientId.randomId();
private final Duration expireThreshold;
+ private final Duration leaseThreshold;
private final int cleanupLimitPerTask;
private final AtomicLong submittedOpenKeyCount;
private final AtomicLong runCount;
@@ -96,6 +98,18 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
TimeUnit.MILLISECONDS);
this.expireThreshold = Duration.ofMillis(expireMillis);
+ long leaseHardMillis = conf.getTimeDuration(OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT,
+ OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT_DEFAULT, TimeUnit.MILLISECONDS);
+ long leaseSoftMillis = conf.getTimeDuration(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT,
+ OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT_DEFAULT, TimeUnit.MILLISECONDS);
+
+ if (leaseHardMillis < leaseSoftMillis) {
+ String msg = "Hard lease limit cannot be less than Soft lease limit. "
+ + "LeaseHardLimit: " + leaseHardMillis + " LeaseSoftLimit: " + leaseSoftMillis;
+ throw new IllegalArgumentException(msg);
+ }
+ this.leaseThreshold = Duration.ofMillis(leaseHardMillis);
+
this.cleanupLimitPerTask = conf.getInt(
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK,
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);
@@ -178,13 +192,12 @@ public BackgroundTaskResult call() throws Exception {
if (!shouldRun()) {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
-
runCount.incrementAndGet();
long startTime = Time.monotonicNow();
final ExpiredOpenKeys expiredOpenKeys;
try {
expiredOpenKeys = keyManager.getExpiredOpenKeys(expireThreshold,
- cleanupLimitPerTask, bucketLayout);
+ cleanupLimitPerTask, bucketLayout, leaseThreshold);
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in next interval", e);
return BackgroundTaskResult.EmptyTaskResult.newResult();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 451417ba3da..7d66ba66578 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -731,7 +731,7 @@ public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
// Test retrieving fewer expired keys than actually exist.
final Collection someExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys - 1, bucketLayout).getOpenKeyBuckets();
+ numExpiredOpenKeys - 1, bucketLayout, expireThreshold).getOpenKeyBuckets();
List names = getOpenKeyNames(someExpiredKeys);
assertEquals(numExpiredOpenKeys - 1, names.size());
assertThat(expiredKeys).containsAll(names);
@@ -739,7 +739,7 @@ public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
// Test attempting to retrieving more expired keys than actually exist.
Collection allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys + 1, bucketLayout).getOpenKeyBuckets();
+ numExpiredOpenKeys + 1, bucketLayout, expireThreshold).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertThat(expiredKeys).containsAll(names);
@@ -747,7 +747,7 @@ public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
// Test retrieving exact amount of expired keys that exist.
allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys, bucketLayout).getOpenKeyBuckets();
+ numExpiredOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertThat(expiredKeys).containsAll(names);
@@ -805,7 +805,7 @@ public void testGetExpiredOpenKeysExcludeMPUKeys(
// Return empty since only MPU-related open keys exist.
assertTrue(omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredMPUOpenKeys, bucketLayout).getOpenKeyBuckets().isEmpty());
+ numExpiredMPUOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets().isEmpty());
// This is for MPU-related open keys prior to isMultipartKey fix in
@@ -839,7 +839,7 @@ public void testGetExpiredOpenKeysExcludeMPUKeys(
// MPU-related open keys should not be fetched regardless of isMultipartKey
// flag if has the multipart upload characteristics
assertTrue(omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredMPUOpenKeys, bucketLayout).getOpenKeyBuckets()
+ numExpiredMPUOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets()
.isEmpty());
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
index 2f4016d0e96..bbd379b561d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
@@ -21,11 +21,18 @@
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
@@ -36,6 +43,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -46,10 +54,12 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,13 +69,17 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
class TestOpenKeyCleanupService {
private OzoneManagerProtocol writeClient;
@@ -96,6 +110,9 @@ public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
EXPIRE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_OM_LEASE_HARD_LIMIT,
+ EXPIRE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
+ conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
conf.setQuietMode(false);
OmTestManagers omTestManagers = new OmTestManagers(conf);
@@ -151,8 +168,8 @@ public void testCleanupExpiredOpenKeys(
assertEquals(0, metrics.getNumOpenKeysCleaned());
assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
final int keyCount = numDEFKeys + numFSOKeys;
- createOpenKeys(numDEFKeys, false, BucketLayout.DEFAULT);
- createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ createOpenKeys(numDEFKeys, false, BucketLayout.DEFAULT, false);
+ createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED, false);
// wait for open keys to expire
Thread.sleep(EXPIRE_THRESHOLD_MS);
@@ -184,6 +201,60 @@ public void testCleanupExpiredOpenKeys(
}
}
+ /**
+ * In this test, we create a bunch of hsync keys with some keys having recover flag set.
+ * OpenKeyCleanupService should commit keys which don't have recovery flag and has expired.
+ * Keys with recovery flag and expired should be ignored by OpenKeyCleanupService.
+ * @throws IOException - on Failure.
+ */
+ @Test
+ @Timeout(300)
+ public void testIgnoreExpiredRecoverhsyncKeys() throws Exception {
+ OpenKeyCleanupService openKeyCleanupService =
+ (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
+
+ openKeyCleanupService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL);
+ final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
+ LOG.info("oldkeyCount={}", oldkeyCount);
+ assertEquals(0, oldkeyCount);
+
+ final OMMetrics metrics = om.getMetrics();
+ assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
+ int keyCount = 10;
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.OPEN)
+ .setId(PipelineID.randomId())
+ .setReplicationConfig(
+ StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE))
+ .setNodes(new ArrayList<>())
+ .build();
+
+ when(om.getScmClient().getContainerClient().getContainerWithPipeline(anyLong()))
+ .thenReturn(new ContainerWithPipeline(Mockito.mock(ContainerInfo.class), pipeline));
+
+ createOpenKeys(keyCount, true, BucketLayout.FILE_SYSTEM_OPTIMIZED, false);
+ // create 2 more key and mark recovery flag set
+ createOpenKeys(2, true, BucketLayout.FILE_SYSTEM_OPTIMIZED, true);
+
+ // wait for open keys to expire
+ Thread.sleep(EXPIRE_THRESHOLD_MS);
+
+ // Only 10 keys should be returned after hard limit period, as 2 key is having recovery flag set
+ assertEquals(keyCount, getExpiredOpenKeys(true, BucketLayout.FILE_SYSTEM_OPTIMIZED));
+ assertExpiredOpenKeys(false, true,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ openKeyCleanupService.resume();
+
+ // 10 keys should be recovered and there should not be any expired key pending
+ waitForOpenKeyCleanup(true, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ // 2 keys should still remain in openKey table
+ assertEquals(2, getOpenKeyInfo(BucketLayout.FILE_SYSTEM_OPTIMIZED).size());
+ }
+
/**
* In this test, we create a bunch of incomplete MPU keys and try to run
* openKeyCleanupService on it. We make sure that none of these incomplete
@@ -326,7 +397,7 @@ private void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
private int getExpiredOpenKeys(boolean hsync, BucketLayout layout) {
try {
final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
- EXPIRE_THRESHOLD, 100, layout);
+ EXPIRE_THRESHOLD, 100, layout, EXPIRE_THRESHOLD);
return (hsync ? expired.getHsyncKeys() : expired.getOpenKeyBuckets())
.size();
} catch (IOException e) {
@@ -334,6 +405,23 @@ private int getExpiredOpenKeys(boolean hsync, BucketLayout layout) {
}
}
+ private List getOpenKeyInfo(BucketLayout bucketLayout) {
+ List omKeyInfo = new ArrayList<>();
+
+ Table openFileTable =
+ om.getMetadataManager().getOpenKeyTable(bucketLayout);
+ try (TableIterator>
+ iterator = openFileTable.iterator()) {
+ while (iterator.hasNext()) {
+ omKeyInfo.add(iterator.next().getValue());
+ }
+
+ } catch (Exception e) {
+ }
+ return omKeyInfo;
+ }
+
+
void waitForOpenKeyCleanup(boolean hsync, BucketLayout layout)
throws Exception {
GenericTestUtils.waitFor(() -> 0 == getExpiredOpenKeys(hsync, layout),
@@ -341,7 +429,7 @@ void waitForOpenKeyCleanup(boolean hsync, BucketLayout layout)
}
private void createOpenKeys(int keyCount, boolean hsync,
- BucketLayout bucketLayout) throws IOException {
+ BucketLayout bucketLayout, boolean recovery) throws IOException {
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();
for (int x = 0; x < keyCount; x++) {
@@ -354,9 +442,9 @@ private void createOpenKeys(int keyCount, boolean hsync,
String key = UUID.randomUUID().toString();
createVolumeAndBucket(volume, bucket, bucketLayout);
- final int numBlocks = RandomUtils.nextInt(0, 3);
+ final int numBlocks = RandomUtils.nextInt(1, 3);
// Create the key
- createOpenKey(volume, bucket, key, numBlocks, hsync);
+ createOpenKey(volume, bucket, key, numBlocks, hsync, recovery);
}
}
@@ -380,7 +468,7 @@ private void createVolumeAndBucket(String volumeName, String bucketName,
}
private void createOpenKey(String volumeName, String bucketName,
- String keyName, int numBlocks, boolean hsync) throws IOException {
+ String keyName, int numBlocks, boolean hsync, boolean recovery) throws IOException {
OmKeyArgs keyArg =
new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -400,6 +488,9 @@ private void createOpenKey(String volumeName, String bucketName,
}
if (hsync) {
writeClient.hsyncKey(keyArg, session.getId());
+ if (recovery) {
+ writeClient.recoverLease(volumeName, bucketName, keyName, false);
+ }
}
}