Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SstFilteringService;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand Down Expand Up @@ -572,7 +573,7 @@ private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager)
} catch (IOException e) {
fail();
}
return snapshotInfo.isSstFiltered();
return SstFilteringService.isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo);
}, 1000, 10000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.snapshot.ListSnapshotResponse;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,13 @@ public static String getSnapshotPrefix(String snapshotName) {
snapshotName + OM_KEY_PREFIX;
}

public static Path getSnapshotPath(OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
return Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
}

public static String getSnapshotPath(OzoneConfiguration conf,
SnapshotInfo snapshotInfo) {
return OMStorage.getOmDbDir(conf) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
Expand All @@ -38,6 +39,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,6 +73,10 @@ public class SstFilteringService extends BackgroundService
// multiple times.
private static final int SST_FILTERING_CORE_POOL_SIZE = 1;

public static final String SST_FILTERED_FILE = "sstFiltered";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we have .txt or any other text file extension?

private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This file holds information " +
"if a particular snapshot has filtered out the relevant sst files or not.\nDO NOT add, change or delete " +
"any files in this directory unless you know what you are doing.\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"any files in this directory unless you know what you are doing.\n");
"this file unless you know what you are doing.\n");

Copy link
Contributor Author

@swamirishi swamirishi Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is inside the rocksdb directory. The statement is correct right isn't? Basically asking people not to change any file in this directory.

private final OzoneManager ozoneManager;

// Number of files to be batched in an iteration.
Expand All @@ -78,6 +86,12 @@ public class SstFilteringService extends BackgroundService

private AtomicBoolean running;

public static boolean isSstFiltered(OzoneConfiguration ozoneConfiguration, SnapshotInfo snapshotInfo) {
Path sstFilteredFile = Paths.get(OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
snapshotInfo), SST_FILTERED_FILE);
return snapshotInfo.isSstFiltered() || sstFilteredFile.toFile().exists();
}

public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) {
super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
Expand Down Expand Up @@ -112,33 +126,35 @@ public void resume() {

private class SstFilteringTask implements BackgroundTask {

private boolean isSnapshotDeleted(SnapshotInfo snapshotInfo) {
return snapshotInfo == null || snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}


/**
* Marks the SSTFiltered flag corresponding to the snapshot.
* @param volume Volume name of the snapshot
* @param bucket Bucket name of the snapshot
* @param snapshotName Snapshot name
* Marks the snapshot as SSTFiltered by creating a file in snapshot directory.
* @param snapshotInfo snapshotInfo
* @throws IOException
*/
private void markSSTFilteredFlagForSnapshot(String volume, String bucket,
String snapshotName) throws IOException {
private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IOException {
// Acquiring read lock to avoid race condition with the snapshot directory deletion occurring
// in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with this operation.
//TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks to read locks to further optimize it.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
.acquireWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Table<String, SnapshotInfo> snapshotInfoTable =
ozoneManager.getMetadataManager().getSnapshotInfoTable();
String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
try {
// mark the snapshot as filtered by writing to the file
String snapshotTableKey = SnapshotInfo.getTableKey(volume, bucket,
snapshotName);
SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);

snapshotInfo.setSstFiltered(true);
snapshotInfoTable.put(snapshotTableKey, snapshotInfo);
// mark the snapshot as filtered by creating a file.
if (Files.exists(Paths.get(snapshotDir))) {
Files.write(Paths.get(snapshotDir, SST_FILTERED_FILE), SST_FILTERED_FILE_CONTENT);
}
} finally {
ozoneManager.getMetadataManager().getLock()
.releaseWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
Expand All @@ -163,12 +179,11 @@ public BackgroundTaskResult call() throws Exception {
long snapshotLimit = snapshotLimitPerTask;

while (iterator.hasNext() && snapshotLimit > 0 && running.get()) {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();
try {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();

if (snapshotInfo.isSstFiltered()) {
if (isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo)) {
continue;
}

Expand All @@ -194,6 +209,9 @@ public BackgroundTaskResult call() throws Exception {
.lock()) {
db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
}
markSSTFilteredFlagForSnapshot(snapshotInfo);
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (OMException ome) {
// FILE_NOT_FOUND is obtained when the snapshot is deleted
// In this case, get the snapshotInfo from the db, check if
Expand All @@ -202,20 +220,22 @@ public BackgroundTaskResult call() throws Exception {
SnapshotInfo snapshotInfoToCheck =
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.get(snapShotTableKey);
if (snapshotInfoToCheck.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
if (isSnapshotDeleted(snapshotInfoToCheck)) {
LOG.info("Snapshot with name: '{}', id: '{}' has been " +
"deleted.", snapshotInfo.getName(), snapshotInfo
.getSnapshotId());
}
}
}
markSSTFilteredFlagForSnapshot(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (RocksDBException | IOException e) {
LOG.error("Exception encountered while filtering a snapshot", e);
if (isSnapshotDeleted(snapshotInfoTable.get(snapShotTableKey))) {
LOG.info("Exception encountered while filtering a snapshot: {} since it was deleted midway",
snapShotTableKey, e);
} else {
LOG.error("Exception encountered while filtering a snapshot", e);
}


}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand All @@ -33,11 +34,11 @@
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Response for OMSnapshotPurgeRequest.
Expand Down Expand Up @@ -116,15 +117,24 @@ private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
*/
private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
// Acquiring write lock to avoid race condition with sst filtering service which creates a sst filtered file
// inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot
// directory can run in parallel along with this operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -100,7 +99,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
private final long snapshotDeletionPerTask;
private final int keyLimitPerSnapshot;
private final int ratisByteLimit;
private final boolean isSstFilteringServiceEnabled;

public SnapshotDeletingService(long interval, long serviceTimeout,
OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient)
Expand Down Expand Up @@ -128,8 +126,6 @@ public SnapshotDeletingService(long interval, long serviceTimeout,
this.keyLimitPerSnapshot = conf.getInt(
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK,
OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);

this.isSstFilteringServiceEnabled = ((KeyManagerImpl) ozoneManager.getKeyManager()).isSstFilteringSvcEnabled();
}

private class SnapshotDeletingTask implements BackgroundTask {
Expand Down Expand Up @@ -594,8 +590,7 @@ public void submitRequest(OMRequest omRequest) {
@VisibleForTesting
boolean shouldIgnoreSnapshot(SnapshotInfo snapInfo) {
SnapshotInfo.SnapshotStatus snapshotStatus = snapInfo.getSnapshotStatus();
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED
|| (isSstFilteringServiceEnabled && !snapInfo.isSstFiltered());
return snapshotStatus != SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}

// TODO: Move this util class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,23 @@ private static Stream<Arguments> testCasesForIgnoreSnapshotGc() {
SnapshotInfo filteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(true).setName("snap1").build();
SnapshotInfo unFilteredSnapshot = SnapshotInfo.newBuilder().setSstFiltered(false).setName("snap1").build();
return Stream.of(
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, true, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, false, true));
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED, false),
Arguments.of(unFilteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true),
Arguments.of(filteredSnapshot, SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, true));
}

@ParameterizedTest
@MethodSource("testCasesForIgnoreSnapshotGc")
public void testProcessSnapshotLogicInSDS(SnapshotInfo snapshotInfo,
SnapshotInfo.SnapshotStatus status,
boolean sstFilteringServiceEnabled,
boolean expectedOutcome)
throws IOException {
Mockito.when(keyManager.isSstFilteringSvcEnabled()).thenReturn(sstFilteringServiceEnabled);
Mockito.when(omMetadataManager.getSnapshotChainManager()).thenReturn(chainManager);
Mockito.when(ozoneManager.getKeyManager()).thenReturn(keyManager);
Mockito.when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
Mockito.when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
Mockito.when(ozoneManager.getConfiguration()).thenReturn(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testIrrelevantSstFileDeletion()
createSnapshot(volumeName, bucketName2, snapshotName1);
SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
.get(SnapshotInfo.getTableKey(volumeName, bucketName2, snapshotName1));
assertFalse(snapshotInfo.isSstFiltered());
assertFalse(SstFilteringService.isSstFiltered(om.getConfiguration(), snapshotInfo));
waitForSnapshotsAtLeast(filteringService, countExistingSnapshots + 1);
assertEquals(countExistingSnapshots + 1, filteringService.getSnapshotFilteredCount().get());

Expand Down Expand Up @@ -238,8 +238,9 @@ public void testIrrelevantSstFileDeletion()

// Need to read the sstFiltered flag which is set in background process and
// hence snapshotInfo.isSstFiltered() may not work sometimes.
assertTrue(om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1)).isSstFiltered());
assertTrue(SstFilteringService.isSstFiltered(om.getConfiguration(),
om.getMetadataManager().getSnapshotInfoTable().get(SnapshotInfo
.getTableKey(volumeName, bucketName2, snapshotName1))));

String snapshotName2 = "snapshot2";
final long count;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void testActiveAndDeletedSnapshotCleanup() throws Exception {
.filter(f -> f.getName().endsWith(SST_FILE_EXTENSION)).count();

// delete snap1
writeClient.deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
deleteSnapshot(volumeName, bucketNames.get(0), "snap1");
sstFilteringService.resume();
// Filtering service will only act on snap2 as it is an active snaphot
waitForSnapshotsAtLeast(sstFilteringService, countTotalSnapshots);
Expand Down Expand Up @@ -505,4 +506,9 @@ private void createSnapshot(String volumeName, String bucketName, String snapsho
writeClient.createSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots++;
}

private void deleteSnapshot(String volumeName, String bucketName, String snapshotName) throws IOException {
writeClient.deleteSnapshot(volumeName, bucketName, snapshotName);
countTotalSnapshots--;
}
}