Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hadoop-hdds/hadoop-dependency-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>assertj-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<!-- required for Surefire versions before 3.0.0-M5 -->
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -340,9 +341,11 @@ public void testParallelDeleteBucketAndCreateKey() throws IOException,
omSM.getHandler().setInjector(injector);
thread1.start();
thread2.start();
GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() > 0,
100, 5000);
Thread.sleep(2000);
// Wait long enough for createKey's preExecute to finish executing
GenericTestUtils.waitFor(() -> {
return getCluster().getOzoneManager().getOmServerProtocol().getLastRequestToSubmit().getCmdType().equals(
Type.CreateKey);
}, 100, 10000);
injector.resume();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.BucketArgs;
Expand Down Expand Up @@ -60,8 +61,8 @@
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.event.Level;

Expand Down Expand Up @@ -198,11 +199,12 @@ public void shutdown() {
}
}

@ParameterizedTest
@ValueSource(ints = {100})
// tried up to 1000 snapshots and this test works, but some of the
// timeouts have to be increased.
public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
private static final int SNAPSHOTS_TO_CREATE = 100;

@Test
public void testInstallSnapshot(@TempDir Path tempDir) throws Exception {
// Get the leader OM
String leaderOMNodeId = OmFailoverProxyUtil
.getFailoverProxyProvider(objectStore.getClientProxy())
Expand Down Expand Up @@ -230,8 +232,7 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
String snapshotName = "";
List<String> keys = new ArrayList<>();
SnapshotInfo snapshotInfo = null;
for (int snapshotCount = 0; snapshotCount < numSnapshotsToCreate;
snapshotCount++) {
for (int snapshotCount = 0; snapshotCount < SNAPSHOTS_TO_CREATE; snapshotCount++) {
snapshotName = snapshotNamePrefix + snapshotCount;
keys = writeKeys(keyIncrement);
snapshotInfo = createOzoneSnapshot(leaderOM, snapshotName);
Expand Down Expand Up @@ -326,7 +327,7 @@ public void testInstallSnapshot(int numSnapshotsToCreate) throws Exception {
private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
String snapshotName,
List<String> keys, SnapshotInfo snapshotInfo)
throws IOException {
throws IOException, RocksDBException {
// Read back data from snapshot.
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
Expand All @@ -347,18 +348,28 @@ private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,
Path leaderActiveDir = Paths.get(leaderMetaDir.toString(), OM_DB_NAME);
Path leaderSnapshotDir =
Paths.get(getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo));

// Get list of live files on the leader.
RocksDB activeRocksDB = ((RDBStore) leaderOM.getMetadataManager().getStore())
.getDb().getManagedRocksDb().get();
// strip the leading "/".
Set<String> liveSstFiles = activeRocksDB.getLiveFiles().files.stream()
.map(s -> s.substring(1))
.collect(Collectors.toSet());

// Get the list of hardlinks from the leader. Then confirm those links
// are on the follower
int hardLinkCount = 0;
try (Stream<Path>list = Files.list(leaderSnapshotDir)) {
try (Stream<Path> list = Files.list(leaderSnapshotDir)) {
for (Path leaderSnapshotSST: list.collect(Collectors.toList())) {
String fileName = leaderSnapshotSST.getFileName().toString();
if (fileName.toLowerCase().endsWith(".sst")) {

Path leaderActiveSST =
Paths.get(leaderActiveDir.toString(), fileName);
// Skip if not hard link on the leader
if (!leaderActiveSST.toFile().exists()) {
// First confirm it is live
if (!liveSstFiles.contains(fileName)) {
continue;
}
// If it is a hard link on the leader, it should be a hard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager)
} catch (IOException e) {
Assertions.fail();
}
return snapshotInfo.isSstFiltered();
return SstFilteringService.isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo);
}, 1000, 10000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,13 @@ public static String getSnapshotPrefix(String snapshotName) {
snapshotName + OM_KEY_PREFIX;
}

public static Path getSnapshotPath(OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
return Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
}

public static String getSnapshotPath(OzoneConfiguration conf,
SnapshotInfo snapshotInfo) {
return OMStorage.getOmDbDir(conf) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
Expand All @@ -38,6 +39,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,6 +73,10 @@ public class SstFilteringService extends BackgroundService
// multiple times.
private static final int SST_FILTERING_CORE_POOL_SIZE = 1;

public static final String SST_FILTERED_FILE = "sstFiltered";
private static final byte[] SST_FILTERED_FILE_CONTENT = StringUtils.string2Bytes("This file holds information " +
"if a particular snapshot has filtered out the relevant sst files or not.\nDO NOT add, change or delete " +
"any files in this directory unless you know what you are doing.\n");
private final OzoneManager ozoneManager;

// Number of files to be batched in an iteration.
Expand All @@ -78,6 +86,12 @@ public class SstFilteringService extends BackgroundService

private AtomicBoolean running;

public static boolean isSstFiltered(OzoneConfiguration ozoneConfiguration, SnapshotInfo snapshotInfo) {
Path sstFilteredFile = Paths.get(OmSnapshotManager.getSnapshotPath(ozoneConfiguration,
snapshotInfo), SST_FILTERED_FILE);
return snapshotInfo.isSstFiltered() || sstFilteredFile.toFile().exists();
}

public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) {
super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
Expand Down Expand Up @@ -112,33 +126,35 @@ public void resume() {

private class SstFilteringTask implements BackgroundTask {

private boolean isSnapshotDeleted(SnapshotInfo snapshotInfo) {
return snapshotInfo == null || snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED;
}


/**
* Marks the SSTFiltered flag corresponding to the snapshot.
* @param volume Volume name of the snapshot
* @param bucket Bucket name of the snapshot
* @param snapshotName Snapshot name
* Marks the snapshot as SSTFiltered by creating a file in snapshot directory.
* @param snapshotInfo snapshotInfo
* @throws IOException
*/
private void markSSTFilteredFlagForSnapshot(String volume, String bucket,
String snapshotName) throws IOException {
private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IOException {
// Acquiring read lock to avoid race condition with the snapshot directory deletion occurring
// in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with this operation.
//TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks to read locks to further optimize it.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
.acquireWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Table<String, SnapshotInfo> snapshotInfoTable =
ozoneManager.getMetadataManager().getSnapshotInfoTable();
String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
try {
// mark the snapshot as filtered by writing to the file
String snapshotTableKey = SnapshotInfo.getTableKey(volume, bucket,
snapshotName);
SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);

snapshotInfo.setSstFiltered(true);
snapshotInfoTable.put(snapshotTableKey, snapshotInfo);
// mark the snapshot as filtered by creating a file.
if (Files.exists(Paths.get(snapshotDir))) {
Files.write(Paths.get(snapshotDir, SST_FILTERED_FILE), SST_FILTERED_FILE_CONTENT);
}
} finally {
ozoneManager.getMetadataManager().getLock()
.releaseWriteLock(SNAPSHOT_LOCK, volume, bucket, snapshotName);
.releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
Expand All @@ -163,12 +179,11 @@ public BackgroundTaskResult call() throws Exception {
long snapshotLimit = snapshotLimitPerTask;

while (iterator.hasNext() && snapshotLimit > 0 && running.get()) {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();
try {
Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
String snapShotTableKey = keyValue.getKey();
SnapshotInfo snapshotInfo = keyValue.getValue();

if (snapshotInfo.isSstFiltered()) {
if (isSstFiltered(ozoneManager.getConfiguration(), snapshotInfo)) {
continue;
}

Expand All @@ -194,6 +209,9 @@ public BackgroundTaskResult call() throws Exception {
.lock()) {
db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
}
markSSTFilteredFlagForSnapshot(snapshotInfo);
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (OMException ome) {
// FILE_NOT_FOUND is obtained when the snapshot is deleted
// In this case, get the snapshotInfo from the db, check if
Expand All @@ -202,20 +220,22 @@ public BackgroundTaskResult call() throws Exception {
SnapshotInfo snapshotInfoToCheck =
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.get(snapShotTableKey);
if (snapshotInfoToCheck.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
if (isSnapshotDeleted(snapshotInfoToCheck)) {
LOG.info("Snapshot with name: '{}', id: '{}' has been " +
"deleted.", snapshotInfo.getName(), snapshotInfo
.getSnapshotId());
}
}
}
markSSTFilteredFlagForSnapshot(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
snapshotLimit--;
snapshotFilteredCount.getAndIncrement();
} catch (RocksDBException | IOException e) {
LOG.error("Exception encountered while filtering a snapshot", e);
if (isSnapshotDeleted(snapshotInfoTable.get(snapShotTableKey))) {
LOG.info("Exception encountered while filtering a snapshot: {} since it was deleted midway",
snapShotTableKey, e);
} else {
LOG.error("Exception encountered while filtering a snapshot", e);
}


}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand All @@ -33,11 +34,11 @@
import javax.annotation.Nonnull;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;

/**
* Response for OMSnapshotPurgeRequest.
Expand Down Expand Up @@ -117,15 +118,24 @@ private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
*/
private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
SnapshotInfo snapshotInfo) {
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
Path snapshotDirPath = Paths.get(store.getSnapshotsParentDir(),
checkpointPrefix + snapshotInfo.getCheckpointDir());
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
// Acquiring write lock to avoid race condition with sst filtering service which creates a sst filtered file
// inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot
// directory can run in parallel along with this operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
try {
FileUtils.deleteDirectory(snapshotDirPath.toFile());
} catch (IOException ex) {
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
}
}
}
}
Loading