Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SstFilteringService;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand Down Expand Up @@ -572,7 +573,7 @@ private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager)
} catch (IOException e) {
fail();
}
return snapshotInfo.isSstFiltered();
return SstFilteringService.isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo);
}, 1000, 10000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.snapshot.ListSnapshotResponse;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,13 @@ public static String getSnapshotPrefix(String snapshotName) {
snapshotName + OM_KEY_PREFIX;
}

public static Path getSnapshotPath(OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
return Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
}

public static String getSnapshotPath(OzoneConfiguration conf,
SnapshotInfo snapshotInfo) {
return OMStorage.getOmDbDir(conf) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
Expand All @@ -38,6 +39,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,6 +73,10 @@ public class SstFilteringService extends BackgroundService
// multiple times.
private static final int SST_FILTERING_CORE_POOL_SIZE = 1;

public static final String SST_FILTERED_FILE = "sstFiltered";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we have .txt or any other text file extension?

private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This directory holds information " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This directory holds information " +
private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This file holds information " +
"if a particular snapshot has filtered out the irrelevant sst files or not.\nDO NOT add, change or delete " +
"this file unless you know what you are doing.\n");

"if a particular snapshot has filtered out the relevant sst files or not.\nDO NOT add, change or delete " +
"any files in this directory unless you know what you are doing.\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"any files in this directory unless you know what you are doing.\n");
"this file unless you know what you are doing.\n");

Copy link
Contributor Author

@swamirishi swamirishi Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is inside the rocksdb directory. The statement is correct right isn't? Basically asking people not to change any file in this directory.

private final OzoneManager ozoneManager;

// Number of files to be batched in an iteration.
Expand All @@ -78,6 +86,12 @@ public class SstFilteringService extends BackgroundService

private AtomicBoolean running;

public static boolean isSstFiltered(OzoneConfiguration ozoneConfiguration, SnapshotInfo snapshotInfo) {
Path sstFilteredFile = Paths.get(OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
snapshotInfo), SST_FILTERED_FILE);
return snapshotInfo.isSstFiltered() || sstFilteredFile.toFile().exists();
}

public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) {
super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
Expand Down Expand Up @@ -114,31 +128,28 @@ private class SstFilteringTask implements BackgroundTask {


/**
* Marks the SSTFiltered flag corresponding to the snapshot.
* @param volume Volume name of the snapshot
* @param bucket Bucket name of the snapshot
* @param snapshotName Snapshot name
* Marks the snapshot as SSTFiltered by creating a file in snapshot directory.
* @param snapshotInfo snapshotInfo
* @throws IOException
*/
private void markSSTFilteredFlagForSnapshot(String volume, String bucket,
String snapshotName) throws IOException {
private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IOException {
// Acquiring read lock to avoid race condition with the snapshot directory deletion occuring
// in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with operation.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
.acquireWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Table<String, SnapshotInfo> snapshotInfoTable =
ozoneManager.getMetadataManager().getSnapshotInfoTable();
String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
try {
// mark the snapshot as filtered by writing to the file
String snapshotTableKey = SnapshotInfo.getTableKey(volume, bucket,
snapshotName);
SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);

snapshotInfo.setSstFiltered(true);
snapshotInfoTable.put(snapshotTableKey, snapshotInfo);
// mark the snapshot as filtered by creating a file.
if (Files.exists(Paths.get(snapshotDir))) {
Files.write(Paths.get(snapshotDir, SST_FILTERED_FILE), SST_FILTERED_FILE_CONTENT);
}
} finally {
ozoneManager.getMetadataManager().getLock()
.releaseWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
Expand Down Expand Up @@ -167,8 +178,7 @@ public BackgroundTaskResult call() throws Exception {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();

if (snapshotInfo.isSstFiltered()) {
if (isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo)) {
continue;
}

Expand All @@ -194,6 +204,9 @@ public BackgroundTaskResult call() throws Exception {
.lock()) {
db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
}
markSSTFilteredFlagForSnapshot(snapshotInfo);
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (OMException ome) {
// FILE_NOT_FOUND is obtained when the snapshot is deleted
// In this case, get the snapshotInfo from the db, check if
Expand All @@ -210,10 +223,6 @@ public BackgroundTaskResult call() throws Exception {
}
}
}
markSSTFilteredFlagForSnapshot(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (RocksDBException | IOException e) {
LOG.error("Exception encountered while filtering a snapshot", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we log it at an info level, if SstFileringService fails because the snapshot is purged? Otherwise, it will be unnecessary noisy logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot be sure why the issue occured. RocksDbException can thrown for many reasons. One of them being the db being closed by another thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I asked to log it at the info level if it fails because snapshot dir doesn't exist which will be the case when it is purged. RocksDB throws exception like org.rocksdb.RocksDBException: While mkdir if missing: path_to_db_dir: No such file or directory..

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand All @@ -33,11 +34,11 @@
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Response for OMSnapshotPurgeRequest.
Expand Down Expand Up @@ -116,15 +117,23 @@ private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
*/
private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
// Acquiring write lock to avoid race condition with sst filtering service which creates a sst filtered file
// inside the snapshot directory. Any operation apart from delete can run in parallel along with operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -100,7 +99,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
private final long snapshotDeletionPerTask;
private final int keyLimitPerSnapshot;
private final int ratisByteLimit;
private final boolean isSstFilteringServiceEnabled;

public SnapshotDeletingService(long interval, long serviceTimeout,
OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient)
Expand Down Expand Up @@ -128,8 +126,6 @@ public SnapshotDeletingService(long interval, long serviceTimeout,
this.keyLimitPerSnapshot = conf.getInt(
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);

this.isSstFilteringServiceEnabled = ((KeyManagerImpl) ozoneManager.getKeyManager()).isSstFilteringSvcEnabled();
}

private class SnapshotDeletingTask implements BackgroundTask {
Expand Down Expand Up @@ -594,8 +590,7 @@ public void submitRequest(OMRequest omRequest) {
@VisibleForTesting
boolean shouldIgnoreSnapshot(SnapshotInfo snapInfo) {
SnapshotInfo.SnapshotStatus snapshotStatus = snapInfo.getSnapshotStatus();
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED
|| (isSstFilteringServiceEnabled && !snapInfo.isSstFiltered());
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}

// TODO: Move this util class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,23 @@ private static Stream<Arguments> testCasesForIgnoreSnapshotGc() {
SnapshotInfo filteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(true).setName("snap1").build();
SnapshotInfo unFilteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(false).setName("snap1").build();
return Stream.of(
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true));
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true));
}

@ParameterizedTest
@MethodSource("testCasesForIgnoreSnapshotGc")
public void testProcessSnapshotLogicInSDS(SnapshotInfo snapshotInfo,
SnapshotInfo.SnapshotStatus status,
boolean sstFilteringServiceEnabled,
boolean expectedOutcome)
throws IOException {
Mockito.when(keyManager.isSstFilteringSvcEnabled()).thenReturn(sstFilteringServiceEnabled);
Mockito.when(omMetadataManager.getSnapshotChainManager()).thenReturn(chainManager);
Mockito.when(ozoneManager.getKeyManager()).thenReturn(keyManager);
Mockito.when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
Mockito.when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
Mockito.when(ozoneManager.getConfiguration()).thenReturn(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testIrrelevantSstFileDeletion()
createSnapshot(volumeName, bucketName2, snapshotName1);
SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
.get(SnapshotInfo.getTableKey(volumeName, bucketName2, snapshotName1));
assertFalse(snapshotInfo.isSstFiltered());
assertFalse(SstFilteringService.isSstFiltered(om.getConfiguration(), snapshotInfo));
waitForSnapshotsAtLeast(filteringService, countExistingSnapshots + 1);
assertEquals(countExistingSnapshots + 1, filteringService.getSnapshotFilteredCount().get());

Expand Down Expand Up @@ -238,8 +238,9 @@ public void testIrrelevantSstFileDeletion()

// Need to read the sstFiltered flag which is set in background process and
// hence snapshotInfo.isSstFiltered() may not work sometimes.
assertTrue(om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1)).isSstFiltered());
assertTrue(SstFilteringService.isSstFiltered(om.getConfiguration(),
om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1))));

String snapshotName2 = "snapshot2";
final long count;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void testActiveAndDeletedSnapshotCleanup() throws Exception {
.filter(f -> f.getName().endsWith(SST_FILE_EXTENSION)).count();

// delete snap1
writeClient.deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
sstFilteringService.resume();
// Filtering service will only act on snap2 as it is an active snaphot
waitForSnapshotsAtLeast(sstFilteringService, countTotalSnapshots);
Expand Down Expand Up @@ -505,4 +506,9 @@ private void createSnapshot(String volumeName, String bucketName, String snapsho
writeClient.createSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots++;
}

private void deleteSnapshot(String volumeName, String bucketName, String snapshotName) throws IOException {
writeClient.deleteSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots--;
}
}