Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,52 @@ public void testOfsHSync(boolean incrementalChunkList) throws Exception {
}
}

@Test
public void testHSyncOpenKeyCommitAfterExpiry() throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final Path key1 = new Path("hsync-key");
final Path key2 = new Path("key2");

try (FileSystem fs = FileSystem.get(CONF)) {
// Create key1 with hsync
try (FSDataOutputStream os = fs.create(key1, true)) {
os.write(1);
os.hsync();
// Create key2 without hsync
try (FSDataOutputStream os1 = fs.create(key2, true)) {
os1.write(1);
// There should be 2 key in openFileTable
assertThat(2 == getOpenKeyInfo(BUCKET_LAYOUT).size());
// One key will be in fileTable as hsynced
assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size());

// Resume openKeyCleanupService
openKeyCleanupService.resume();
// Verify hsync openKey gets committed eventually
// Key without hsync is deleted
GenericTestUtils.waitFor(() ->
0 == getOpenKeyInfo(BUCKET_LAYOUT).size(), 1000, 12000);
// Verify only one key is still present in fileTable
assertThat(1 == getKeyInfo(BUCKET_LAYOUT).size());

// Clean up
assertTrue(fs.delete(key1, false));
waitForEmptyDeletedTable();
} catch (OMException ex) {
assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult());
Copy link
Contributor

@ChenSammi ChenSammi Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashishkumar50 , if key2 is not explicitly deleted, will os1 still throws KEY_NOT_FOUND?

Given the issue to fix, I would suggest this new test to have one hsynced key and one normal key to cover the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one hsync and one normal key.

if key2 is not explicitly deleted, will os1 still throws KEY_NOT_FOUND?

Yes because when os1 is geting closed it will try to commit key and which it will not find and throws KEY_NOT_FOUND error.

}
} catch (OMException ex) {
assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ex.getResult());
} finally {
openKeyCleanupService.suspend();
}
}
}

@Test
public void testHSyncDeletedKey() throws Exception {
// Verify that a key can't be successfully hsync'ed again after it's deleted,
Expand Down Expand Up @@ -595,6 +641,21 @@ private List<OmKeyInfo> getOpenKeyInfo(BucketLayout bucketLayout) {
return omKeyInfo;
}

private List<OmKeyInfo> getKeyInfo(BucketLayout bucketLayout) {
List<OmKeyInfo> omKeyInfo = new ArrayList<>();

Table<String, OmKeyInfo> openFileTable =
cluster.getOzoneManager().getMetadataManager().getKeyTable(bucketLayout);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
iterator = openFileTable.iterator()) {
while (iterator.hasNext()) {
omKeyInfo.add(iterator.next().getValue());
}
} catch (Exception e) {
}
return omKeyInfo;
}

@Test
public void testUncommittedBlocks() throws Exception {
waitForEmptyDeletedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class OpenKeyCleanupService extends BackgroundService {
private final Duration leaseThreshold;
private final int cleanupLimitPerTask;
private final AtomicLong submittedOpenKeyCount;
private final AtomicLong runCount;
private final AtomicLong callId;
private final AtomicBoolean suspended;

public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
Expand Down Expand Up @@ -112,20 +113,10 @@ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);

this.submittedOpenKeyCount = new AtomicLong(0);
this.runCount = new AtomicLong(0);
this.callId = new AtomicLong(0);
this.suspended = new AtomicBoolean(false);
}

/**
* Returns the number of times this Background service has run.
*
* @return Long, run count.
*/
@VisibleForTesting
public long getRunCount() {
return runCount.get();
}

/**
* Suspend the service (for testing).
*/
Expand Down Expand Up @@ -189,7 +180,6 @@ public BackgroundTaskResult call() throws Exception {
if (!shouldRun()) {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
runCount.incrementAndGet();
long startTime = Time.monotonicNow();
final ExpiredOpenKeys expiredOpenKeys;
try {
Expand Down Expand Up @@ -244,6 +234,7 @@ private OMRequest createCommitKeyRequest(
.setCmdType(Type.CommitKey)
.setCommitKeyRequest(request)
.setClientId(clientId.toString())
.setVersion(ClientVersion.CURRENT_VERSION)
.build();
}

Expand All @@ -265,7 +256,7 @@ private OMRequest createDeleteOpenKeysRequest(

private OMResponse submitRequest(OMRequest omRequest) {
try {
return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet());
} catch (ServiceException e) {
LOG.error("Open key " + omRequest.getCmdType()
+ " request failed. Will retry at next run.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public void testCleanupExpiredOpenKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldkeyCount={}, oldrunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldkeyCount={}", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numKeyHSyncs = metrics.getNumKeyHSyncs();
Expand All @@ -189,9 +188,6 @@ public void testCleanupExpiredOpenKeys(
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + keyCount,
SERVICE_INTERVAL, WAIT_TIME);
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

waitForOpenKeyCleanup(false, BucketLayout.DEFAULT);
waitForOpenKeyCleanup(hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
Expand Down Expand Up @@ -332,8 +328,7 @@ public void testExcludeMPUOpenKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldMpuKeyCount={}", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numKeyHSyncs = metrics.getNumKeyHSyncs();
Expand All @@ -353,13 +348,8 @@ public void testExcludeMPUOpenKeys(
BucketLayout.FILE_SYSTEM_OPTIMIZED);

openKeyCleanupService.resume();

GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

// wait for requests to complete
Thread.sleep(SERVICE_INTERVAL);
// wait for openKeyCleanupService to complete at least once
Thread.sleep(SERVICE_INTERVAL * 2);

// No expired open keys fetched
assertEquals(openKeyCleanupService.getSubmittedOpenKeyCount(), oldkeyCount);
Expand Down Expand Up @@ -397,8 +387,7 @@ public void testCleanupExpiredOpenMPUPartKeys(
// wait for submitted tasks to complete
Thread.sleep(SERVICE_INTERVAL);
final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
final long oldrunCount = openKeyCleanupService.getRunCount();
LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
LOG.info("oldMpuKeyCount={},", oldkeyCount);

final OMMetrics metrics = om.getMetrics();
long numOpenKeysCleaned = metrics.getNumOpenKeysCleaned();
Expand All @@ -423,9 +412,6 @@ public void testCleanupExpiredOpenMPUPartKeys(
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getSubmittedOpenKeyCount() >= oldkeyCount + partCount,
SERVICE_INTERVAL, WAIT_TIME);
GenericTestUtils.waitFor(
() -> openKeyCleanupService.getRunCount() >= oldrunCount + 2,
SERVICE_INTERVAL, WAIT_TIME);

// No expired MPU parts fetched
waitForOpenKeyCleanup(false, BucketLayout.DEFAULT);
Expand Down
Loading