Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1421,16 +1421,21 @@ public String getCompactionLogDir() {
* those are not needed to generate snapshot diff. These files are basically
* non-leaf nodes of the DAG.
*/
public synchronized void pruneSstFiles() {
public void pruneSstFiles() {
if (!shouldRun()) {
return;
}

Set<String> nonLeafSstFiles;
nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
.filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
.map(node -> node.getFileName())
.collect(Collectors.toSet());
// This is synchronized because compaction thread can update the compactionDAG and can be in situation
// when nodes are added to the graph, but arcs are still in progress.
// Hence, the lock is taken.
synchronized (this) {
nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
.filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
.map(node -> node.getFileName())
.collect(Collectors.toSet());
}

if (CollectionUtils.isNotEmpty(nonLeafSstFiles)) {
LOG.info("Removing SST files: {} as part of SST file pruning.",
Expand All @@ -1448,8 +1453,13 @@ public void incrementTarballRequestCount() {
tarballRequestCount.incrementAndGet();
}

public void decrementTarballRequestCount() {
tarballRequestCount.decrementAndGet();
public void decrementTarballRequestCountAndNotify() {
// Synchronized block is used to ensure that lock is on the same instance notifyAll is being called.
synchronized (this) {
tarballRequestCount.decrementAndGet();
// Notify compaction threads to continue.
notifyAll();
}
}

public boolean shouldRun() {
Expand Down Expand Up @@ -1517,8 +1527,7 @@ public static RocksDBCheckpointDiffer getInstance(
* for cache.
*/
public static void invalidateCacheEntry(String cacheKey) {
IOUtils.closeQuietly(INSTANCE_MAP.get(cacheKey));
INSTANCE_MAP.remove(cacheKey);
IOUtils.close(LOG, INSTANCE_MAP.remove(cacheKey));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,16 @@ public static void shutdown() {
*/
public static String createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
createKey(ozoneBucket, keyName);
return keyName;
}

public static void createKey(OzoneBucket ozoneBucket, String keyName) throws IOException {
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
data.length(), ReplicationType.RATIS,
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, data.length(), ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length());
ozoneOutputStream.close();
return keyName;
}

protected OzoneBucket setupBucket() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.hadoop.ozone.om;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.IOUtils;
Expand All @@ -40,6 +39,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
Expand All @@ -48,7 +48,6 @@
import org.apache.ozone.rocksdiff.CompactionNode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.server.protocol.TermIndex;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -66,6 +65,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -88,23 +88,21 @@
* Tests snapshot background services.
*/
@Timeout(5000)
@Flaky("HDDS-9455")
public class TestSnapshotBackgroundServices {

private MiniOzoneHAClusterImpl cluster = null;
private MiniOzoneHAClusterImpl cluster;
private ObjectStore objectStore;
private OzoneBucket ozoneBucket;
private String volumeName;
private String bucketName;

private static final long SNAPSHOT_THRESHOLD = 50;
private static final int LOG_PURGE_GAP = 50;
// This test depends on direct RocksDB checks that are easier done with OBS
// buckets.
private static final BucketLayout TEST_BUCKET_LAYOUT =
BucketLayout.OBJECT_STORE;
private static final String SNAPSHOT_NAME_PREFIX = "snapshot";
// This test depends on direct RocksDB checks that are easier done with OBS buckets.
private static final BucketLayout TEST_BUCKET_LAYOUT = BucketLayout.OBJECT_STORE;
private static final String SNAPSHOT_NAME_PREFIX = "snapshot-";
private static final String KEY_NAME_PREFIX = "key-";
private OzoneClient client;
private final AtomicInteger counter = new AtomicInteger();

/**
* Create a MiniOzoneCluster for testing. The cluster initially has one
Expand All @@ -117,11 +115,12 @@ public void init(TestInfo testInfo) throws Exception {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
String omServiceId = "om-service-test1";
OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class);
omRatisConf.setLogAppenderWaitTimeMin(10);
conf.setFromObject(omRatisConf);
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16,
StorageUnit.KB);
conf.setStorageSize(OMConfigKeys.
OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) {
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1,
TimeUnit.SECONDS);
Expand Down Expand Up @@ -176,12 +175,12 @@ public void init(TestInfo testInfo) throws Exception {
client = OzoneClientFactory.getRpcClient(omServiceId, conf);
objectStore = client.getObjectStore();

volumeName = "volume" + RandomStringUtils.randomNumeric(5);
bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
volumeName = "volume" + counter.incrementAndGet();
bucketName = "bucket" + counter.incrementAndGet();

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner("user" + RandomStringUtils.randomNumeric(5))
.setAdmin("admin" + RandomStringUtils.randomNumeric(5))
.setOwner("user" + counter.incrementAndGet())
.setAdmin("admin" + counter.incrementAndGet())
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
Expand Down Expand Up @@ -226,8 +225,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices()
cluster.getOzoneManager(leaderOM.getOMNodeId());
assertEquals(leaderOM, newFollowerOM);

SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM,
SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5));
SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet());

/*
Check whether newly created key data is reclaimed
Expand All @@ -252,8 +250,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices()
assertNotNull(keyInfoA);

// create snapshot b
SnapshotInfo snapshotInfoB = createOzoneSnapshot(newLeaderOM,
SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5));
SnapshotInfo snapshotInfoB = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet());
assertNotNull(snapshotInfoB);

// delete key a
Expand All @@ -263,8 +260,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices()
() -> !isKeyInTable(keyA, omKeyInfoTable));

// create snapshot c
SnapshotInfo snapshotInfoC = createOzoneSnapshot(newLeaderOM,
SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5));
SnapshotInfo snapshotInfoC = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet());

// get snapshot c
OmSnapshot snapC;
Expand All @@ -281,8 +277,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices()
() -> isKeyInTable(keyA, snapC.getMetadataManager().getDeletedTable()));

// create snapshot d
SnapshotInfo snapshotInfoD = createOzoneSnapshot(newLeaderOM,
SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5));
SnapshotInfo snapshotInfoD = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet());

// delete snapshot c
client.getObjectStore()
Expand Down Expand Up @@ -535,18 +530,14 @@ public void testSSTFilteringBackgroundService()
private void confirmSnapDiffForTwoSnapshotsDifferingBySingleKey(
OzoneManager ozoneManager)
throws IOException, InterruptedException, TimeoutException {
String firstSnapshot = createOzoneSnapshot(ozoneManager,
TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX +
RandomStringUtils.randomNumeric(10)).getName();
String firstSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet())
.getName();
String diffKey = writeKeys(1).get(0);
String secondSnapshot = createOzoneSnapshot(ozoneManager,
TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX +
RandomStringUtils.randomNumeric(10)).getName();
SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName,
firstSnapshot, secondSnapshot);
String secondSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet())
.getName();
SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName, firstSnapshot, secondSnapshot);
assertEquals(Collections.singletonList(
SnapshotDiffReportOzone.getDiffReportEntry(
SnapshotDiffReport.DiffType.CREATE, diffKey, null)),
SnapshotDiffReportOzone.getDiffReportEntry(SnapshotDiffReport.DiffType.CREATE, diffKey, null)),
diff.getDiffList());
}

Expand Down Expand Up @@ -576,9 +567,7 @@ private static File getSstBackupDir(OzoneManager ozoneManager) {
private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager)
throws IOException, TimeoutException, InterruptedException {
writeKeys(1);
SnapshotInfo newSnapshot = createOzoneSnapshot(ozoneManager,
TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX +
RandomStringUtils.randomNumeric(5));
SnapshotInfo newSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet());
assertNotNull(newSnapshot);
Table<String, SnapshotInfo> snapshotInfoTable =
ozoneManager.getMetadataManager().getSnapshotInfoTable();
Expand Down Expand Up @@ -642,22 +631,17 @@ private SnapshotDiffReportOzone getSnapDiffReport(String volume,
return response.get().getSnapshotDiffReport();
}

private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name)
throws IOException {
private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name) throws IOException {
objectStore.createSnapshot(volumeName, bucketName, name);

String tableKey = SnapshotInfo.getTableKey(volumeName,
bucketName,
name);
String tableKey = SnapshotInfo.getTableKey(volumeName, bucketName, name);
SnapshotInfo snapshotInfo = leaderOM.getMetadataManager()
.getSnapshotInfoTable()
.get(tableKey);
// Allow the snapshot to be written to disk
String fileName =
getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo);
String fileName = getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo);
File snapshotDir = new File(fileName);
if (!RDBCheckpointUtils
.waitForCheckpointDirectoryExist(snapshotDir)) {
if (!RDBCheckpointUtils.waitForCheckpointDirectoryExist(snapshotDir)) {
throw new IOException("snapshot directory doesn't exist");
}
return snapshotInfo;
Expand All @@ -667,7 +651,9 @@ private List<String> writeKeys(long keyCount) throws IOException {
List<String> keys = new ArrayList<>();
long index = 0;
while (index < keyCount) {
keys.add(createKey(ozoneBucket));
String key = KEY_NAME_PREFIX + counter.incrementAndGet();
createKey(ozoneBucket, key);
keys.add(key);
index++;
}
return keys;
Expand All @@ -681,5 +667,4 @@ private void readKeys(List<String> keys) throws IOException {
inputStream.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
long startTime = System.currentTimeMillis();
long pauseCounter = PAUSE_COUNTER.incrementAndGet();

// Pause compactions, Copy/link files and get checkpoint.
try {
LOG.info("Compaction pausing {} started.", pauseCounter);
// Pause compactions, Copy/link files and get checkpoint.
differ.incrementTarballRequestCount();
FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
compactionLogDir.getTmpDir());
Expand All @@ -252,13 +252,9 @@ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
checkpoint = getDbStore().getCheckpoint(flush);
} finally {
// Unpause the compaction threads.
synchronized (getDbStore().getRocksDBCheckpointDiffer()) {
differ.decrementTarballRequestCount();
differ.notifyAll();
long elapsedTime = System.currentTimeMillis() - startTime;
LOG.info("Compaction pausing {} ended. Elapsed ms: {}",
pauseCounter, elapsedTime);
}
differ.decrementTarballRequestCountAndNotify();
long elapsedTime = System.currentTimeMillis() - startTime;
LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter, elapsedTime);
}
return checkpoint;
}
Expand Down