Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,17 @@
</description>
</property>

<property>
<name>ozone.om.lease.hard.limit</name>
<value>7d</value>
<tag>OZONE, OM, PERFORMANCE</tag>
<description>
Controls how long an open hsync key is considered as active. Specifically, if a hsync key
has been open longer than the value of this config entry, that open hsync key is considered as
expired (e.g. due to client crash). Unit could be defined with postfix (ns,ms,s,m,h,d)
Copy link
Contributor

@smengcl smengcl Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if an admin misconfigures this to 7ms? A key will almost immediately get committed after a client calls hsync() if OpenKeyCleanupService is triggered at that moment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is, should we have a lower limit on this config?

Copy link
Contributor

@ChenSammi ChenSammi Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add one check, the hard limit should not be less than the soft limit.

For most of duration properties in Ozone, if some of them are related, then we will check whether one is smaller than another. Talking about misconfiguration, it's hard to draw a bar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenSammi I agree that it is generally hard to prevent all misconfigurations.

I was prompted when I saw these ns,ms units in the description. :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check for hard limit should not be less than the soft limit, and made it equal if it is so.

</description>
</property>

<property>
<name>ozone.om.open.key.cleanup.limit.per.task</name>
<value>1000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ private OMConfigKeys() {
public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT =
"7d";

public static final String OZONE_OM_LEASE_HARD_LIMIT =
"ozone.om.lease.hard.limit";
public static final String OZONE_OM_LEASE_HARD_LIMIT_DEFAULT =
"7d";

public static final String OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK =
"ozone.om.open.key.cleanup.limit.per.task";
public static final int OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
* @throws IOException
*/
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
BucketLayout bucketLayout, Duration leaseThreshold) throws IOException;

/**
* Returns the names of up to {@code count} MPU key whose age is greater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,12 @@ List<RepeatedOmKeyInfo> listTrash(String volumeName, String bucketName,
* @param count The maximum number of expired open keys to return.
* @param expireThreshold The threshold of open key expiration age.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
* @param leaseThreshold The threshold of hsync key.
* @return the expired open keys.
* @throws IOException
*/
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
BucketLayout bucketLayout, Duration leaseThreshold) throws IOException;

/**
* Returns the MPU infos of up to {@code count} whose age is greater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,9 @@ public PendingKeysDeletion getPendingDeletionKeys(final int count)

@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
int count, BucketLayout bucketLayout, Duration leaseThreshold) throws IOException {
return metadataManager.getExpiredOpenKeys(expireThreshold, count,
bucketLayout);
bucketLayout, leaseThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ public long getTotalOpenKeyCount() throws IOException {

@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
int count, BucketLayout bucketLayout, Duration leaseThreshold) throws IOException {
final ExpiredOpenKeys expiredKeys = new ExpiredOpenKeys();

final Table<String, OmKeyInfo> kt = getKeyTable(bucketLayout);
Expand All @@ -1857,6 +1857,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
final long expiredCreationTimestamp =
expireThreshold.negated().plusMillis(Time.now()).toMillis();

final long expiredLeaseTimestamp =
leaseThreshold.negated().plusMillis(Time.now()).toMillis();

int num = 0;
while (num < count && keyValueTableIterator.hasNext()) {
Expand All @@ -1871,7 +1873,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
continue;
}

if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp ||
openKeyInfo.getModificationTime() <= expiredLeaseTimestamp) {
final String clientIdString
= dbOpenKeyName.substring(lastPrefix + 1);

Expand All @@ -1882,10 +1885,12 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
.filter(id -> id.equals(clientIdString))
.isPresent();

if (!isHsync) {
if (!isHsync && openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
// add non-hsync'ed keys
expiredKeys.addOpenKey(openKeyInfo, dbOpenKeyName);
} else {
num++;
} else if (isHsync && openKeyInfo.getModificationTime() <= expiredLeaseTimestamp &&
!openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
Comment on lines +1892 to +1893
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (isHsync && openKeyInfo.getModificationTime() <= expiredLeaseTimestamp &&
!openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
} else if (isHsync && openKeyInfo.getModificationTime() <= expiredLeaseTimestamp) {

What if the client performing lease recovery crashes before committing the final change? The file would become recovery-in-progress forever and can't be closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When manual recovery is performed on a file we are skipping those file for auto recovery. These file can be manually recovered any time. We are skipping this to avoid any data loss in this case as the exact length may not get updated during auto recovery.
@ChenSammi can you please help to confirm if we can remove this check?

Copy link
Contributor

@ChenSammi ChenSammi Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If the manual recovery client crashes before the final file commit, then this file will be kept in the openKeyTable. User can rerun the manual recovery through CLI again to recover the file later.
The reason we let this file in openKeyTable is auto-recovery will inevitably lost some data in file's last block. So if user showed the intent to recover the file manually, we need to keep the chance for the user.

// add hsync'ed keys
final KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(info.getVolumeName())
Expand All @@ -1903,8 +1908,8 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
info.getReplicationConfig(), keyArgs);

expiredKeys.addHsyncKey(keyArgs, Long.parseLong(clientIdString));
num++;
}
num++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
// refresh last block pipeline
ContainerWithPipeline containerWithPipeline =
ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(finalBlock.getContainerID());
finalBlock.setPipeline(containerWithPipeline.getPipeline());
if (containerWithPipeline != null) {
finalBlock.setPipeline(containerWithPipeline.getPipeline());
}

RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder();
rb.setKeyInfo(returnKeyInfo ? keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class OpenKeyCleanupService extends BackgroundService {
// service, not the client.
private final ClientId clientId = ClientId.randomId();
private final Duration expireThreshold;
private final Duration leaseThreshold;
private final int cleanupLimitPerTask;
private final AtomicLong submittedOpenKeyCount;
private final AtomicLong runCount;
Expand All @@ -96,6 +97,10 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
TimeUnit.MILLISECONDS);
this.expireThreshold = Duration.ofMillis(expireMillis);

long leaseMillis = conf.getTimeDuration(OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT,
OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT_DEFAULT, TimeUnit.MILLISECONDS);
this.leaseThreshold = Duration.ofMillis(leaseMillis);

this.cleanupLimitPerTask = conf.getInt(
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK,
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);
Expand Down Expand Up @@ -178,13 +183,12 @@ public BackgroundTaskResult call() throws Exception {
if (!shouldRun()) {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

runCount.incrementAndGet();
long startTime = Time.monotonicNow();
final ExpiredOpenKeys expiredOpenKeys;
try {
expiredOpenKeys = keyManager.getExpiredOpenKeys(expireThreshold,
cleanupLimitPerTask, bucketLayout);
cleanupLimitPerTask, bucketLayout, leaseThreshold);
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in next interval", e);
return BackgroundTaskResult.EmptyTaskResult.newResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,23 +731,23 @@ public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
// Test retrieving fewer expired keys than actually exist.
final Collection<OpenKeyBucket.Builder> someExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys - 1, bucketLayout).getOpenKeyBuckets();
numExpiredOpenKeys - 1, bucketLayout, expireThreshold).getOpenKeyBuckets();
List<String> names = getOpenKeyNames(someExpiredKeys);
assertEquals(numExpiredOpenKeys - 1, names.size());
assertThat(expiredKeys).containsAll(names);

// Test attempting to retrieving more expired keys than actually exist.
Collection<OpenKeyBucket.Builder> allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys + 1, bucketLayout).getOpenKeyBuckets();
numExpiredOpenKeys + 1, bucketLayout, expireThreshold).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertThat(expiredKeys).containsAll(names);

// Test retrieving exact amount of expired keys that exist.
allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredOpenKeys, bucketLayout).getOpenKeyBuckets();
numExpiredOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertThat(expiredKeys).containsAll(names);
Expand Down Expand Up @@ -805,7 +805,7 @@ public void testGetExpiredOpenKeysExcludeMPUKeys(

// Return empty since only MPU-related open keys exist.
assertTrue(omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredMPUOpenKeys, bucketLayout).getOpenKeyBuckets().isEmpty());
numExpiredMPUOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets().isEmpty());


// This is for MPU-related open keys prior to isMultipartKey fix in
Expand Down Expand Up @@ -839,7 +839,7 @@ public void testGetExpiredOpenKeysExcludeMPUKeys(
// MPU-related open keys should not be fetched regardless of isMultipartKey
// flag if has the multipart upload characteristics
assertTrue(omMetadataManager.getExpiredOpenKeys(expireThreshold,
numExpiredMPUOpenKeys, bucketLayout).getOpenKeyBuckets()
numExpiredMPUOpenKeys, bucketLayout, expireThreshold).getOpenKeyBuckets()
.isEmpty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
Expand All @@ -36,6 +38,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
Expand All @@ -46,6 +49,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -59,9 +63,11 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_LEASE_HARD_LIMIT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -96,6 +102,9 @@ public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
EXPIRE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_OM_LEASE_HARD_LIMIT,
EXPIRE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
conf.setQuietMode(false);
OmTestManagers omTestManagers = new OmTestManagers(conf);
Expand Down Expand Up @@ -151,8 +160,8 @@ public void testCleanupExpiredOpenKeys(
assertEquals(0, metrics.getNumOpenKeysCleaned());
assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
final int keyCount = numDEFKeys + numFSOKeys;
createOpenKeys(numDEFKeys, false, BucketLayout.DEFAULT);
createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
createOpenKeys(numDEFKeys, false, BucketLayout.DEFAULT, false);
createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED, false);

// wait for open keys to expire
Thread.sleep(EXPIRE_THRESHOLD_MS);
Expand Down Expand Up @@ -184,6 +193,49 @@ public void testCleanupExpiredOpenKeys(
}
}

/**
* In this test, we create a bunch of hsync keys with some keys having recover flag set.
* OpenKeyCleanupService should commit keys which don't have recovery flag and has expired.
* Keys with recovery flag and expired should be ignored by OpenKeyCleanupService.
* @throws IOException - on Failure.
*/
@Test
@Timeout(300)
public void testIgnoreExpiredRecoverhsyncKeys() throws Exception {
OpenKeyCleanupService openKeyCleanupService =
(OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();

openKeyCleanupService.suspend();
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
LOG.info("oldkeyCount={}", oldkeyCount);
assertEquals(0, oldkeyCount);

final OMMetrics metrics = om.getMetrics();
assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
int keyCount = 10;
createOpenKeys(keyCount, true, BucketLayout.FILE_SYSTEM_OPTIMIZED, false);
// create 2 more key and mark recovery flag set
createOpenKeys(2, true, BucketLayout.FILE_SYSTEM_OPTIMIZED, true);

// wait for open keys to expire
Thread.sleep(EXPIRE_THRESHOLD_MS);

// Only 10 keys should be returned after hard limit period, as 2 key is having recovery flag set
assertEquals(keyCount, getExpiredOpenKeys(true, BucketLayout.FILE_SYSTEM_OPTIMIZED));
assertExpiredOpenKeys(false, true,
BucketLayout.FILE_SYSTEM_OPTIMIZED);

openKeyCleanupService.resume();

// 10 keys should be recovered and there should not be any expired key pending
waitForOpenKeyCleanup(true, BucketLayout.FILE_SYSTEM_OPTIMIZED);

// 2 keys should still remain in openKey table
assertEquals(2, getOpenKeyInfo(BucketLayout.FILE_SYSTEM_OPTIMIZED).size());
}

/**
* In this test, we create a bunch of incomplete MPU keys and try to run
* openKeyCleanupService on it. We make sure that none of these incomplete
Expand Down Expand Up @@ -326,22 +378,39 @@ private void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
private int getExpiredOpenKeys(boolean hsync, BucketLayout layout) {
try {
final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
EXPIRE_THRESHOLD, 100, layout);
EXPIRE_THRESHOLD, 100, layout, EXPIRE_THRESHOLD);
return (hsync ? expired.getHsyncKeys() : expired.getOpenKeyBuckets())
.size();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private List<OmKeyInfo> getOpenKeyInfo(BucketLayout bucketLayout) {
List<OmKeyInfo> omKeyInfo = new ArrayList<>();

Table<String, OmKeyInfo> openFileTable =
om.getMetadataManager().getOpenKeyTable(bucketLayout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
iterator = openFileTable.iterator()) {
while (iterator.hasNext()) {
omKeyInfo.add(iterator.next().getValue());
}

} catch (Exception e) {
}
return omKeyInfo;
}


void waitForOpenKeyCleanup(boolean hsync, BucketLayout layout)
throws Exception {
GenericTestUtils.waitFor(() -> 0 == getExpiredOpenKeys(hsync, layout),
SERVICE_INTERVAL, WAIT_TIME);
}

private void createOpenKeys(int keyCount, boolean hsync,
BucketLayout bucketLayout) throws IOException {
BucketLayout bucketLayout, boolean recovery) throws IOException {
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();
for (int x = 0; x < keyCount; x++) {
Expand All @@ -354,9 +423,9 @@ private void createOpenKeys(int keyCount, boolean hsync,
String key = UUID.randomUUID().toString();
createVolumeAndBucket(volume, bucket, bucketLayout);

final int numBlocks = RandomUtils.nextInt(0, 3);
final int numBlocks = RandomUtils.nextInt(1, 3);
// Create the key
createOpenKey(volume, bucket, key, numBlocks, hsync);
createOpenKey(volume, bucket, key, numBlocks, hsync, recovery);
}
}

Expand All @@ -380,7 +449,7 @@ private void createVolumeAndBucket(String volumeName, String bucketName,
}

private void createOpenKey(String volumeName, String bucketName,
String keyName, int numBlocks, boolean hsync) throws IOException {
String keyName, int numBlocks, boolean hsync, boolean recovery) throws IOException {
OmKeyArgs keyArg =
new OmKeyArgs.Builder()
.setVolumeName(volumeName)
Expand All @@ -400,6 +469,9 @@ private void createOpenKey(String volumeName, String bucketName,
}
if (hsync) {
writeClient.hsyncKey(keyArg, session.getId());
if (recovery) {
writeClient.recoverLease(volumeName, bucketName, keyName, false);
}
}
}

Expand Down