diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index e830106e5700..5e612d8b204d 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -1421,16 +1421,21 @@ public String getCompactionLogDir() { * those are not needed to generate snapshot diff. These files are basically * non-leaf nodes of the DAG. */ - public synchronized void pruneSstFiles() { + public void pruneSstFiles() { if (!shouldRun()) { return; } Set nonLeafSstFiles; - nonLeafSstFiles = forwardCompactionDAG.nodes().stream() - .filter(node -> !forwardCompactionDAG.successors(node).isEmpty()) - .map(node -> node.getFileName()) - .collect(Collectors.toSet()); + // This is synchronized because compaction thread can update the compactionDAG and can be in situation + // when nodes are added to the graph, but arcs are still in progress. + // Hence, the lock is taken. + synchronized (this) { + nonLeafSstFiles = forwardCompactionDAG.nodes().stream() + .filter(node -> !forwardCompactionDAG.successors(node).isEmpty()) + .map(node -> node.getFileName()) + .collect(Collectors.toSet()); + } if (CollectionUtils.isNotEmpty(nonLeafSstFiles)) { LOG.info("Removing SST files: {} as part of SST file pruning.", @@ -1448,8 +1453,13 @@ public void incrementTarballRequestCount() { tarballRequestCount.incrementAndGet(); } - public void decrementTarballRequestCount() { - tarballRequestCount.decrementAndGet(); + public void decrementTarballRequestCountAndNotify() { + // Synchronized block is used to ensure that lock is on the same instance notifyAll is being called. + synchronized (this) { + tarballRequestCount.decrementAndGet(); + // Notify compaction threads to continue. + notifyAll(); + } } public boolean shouldRun() { @@ -1517,8 +1527,7 @@ public static RocksDBCheckpointDiffer getInstance( * for cache. */ public static void invalidateCacheEntry(String cacheKey) { - IOUtils.closeQuietly(INSTANCE_MAP.get(cacheKey)); - INSTANCE_MAP.remove(cacheKey); + IOUtils.close(LOG, INSTANCE_MAP.remove(cacheKey)); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index c18d1f8b17a5..132c41b2cce1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -217,13 +217,16 @@ public static void shutdown() { */ public static String createKey(OzoneBucket ozoneBucket) throws IOException { String keyName = "key" + RandomStringUtils.randomNumeric(5); + createKey(ozoneBucket, keyName); + return keyName; + } + + public static void createKey(OzoneBucket ozoneBucket, String keyName) throws IOException { String data = "data" + RandomStringUtils.randomNumeric(5); - OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, - data.length(), ReplicationType.RATIS, + OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, data.length(), ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>()); ozoneOutputStream.write(data.getBytes(UTF_8), 0, data.length()); ozoneOutputStream.close(); - return keyName; } protected OzoneBucket setupBucket() throws Exception { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java index dd0af27c950e..8f270b7c4325 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.ozone.om; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.IOUtils; @@ -40,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.om.snapshot.SnapshotCache; import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; @@ -48,7 +48,6 @@ import org.apache.ozone.rocksdiff.CompactionNode; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.LambdaTestUtils; -import org.apache.ozone.test.tag.Flaky; import org.apache.ratis.server.protocol.TermIndex; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -66,6 +65,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -88,10 +88,8 @@ * Tests snapshot background services. */ @Timeout(5000) -@Flaky("HDDS-9455") public class TestSnapshotBackgroundServices { - - private MiniOzoneHAClusterImpl cluster = null; + private MiniOzoneHAClusterImpl cluster; private ObjectStore objectStore; private OzoneBucket ozoneBucket; private String volumeName; @@ -99,12 +97,12 @@ public class TestSnapshotBackgroundServices { private static final long SNAPSHOT_THRESHOLD = 50; private static final int LOG_PURGE_GAP = 50; - // This test depends on direct RocksDB checks that are easier done with OBS - // buckets. - private static final BucketLayout TEST_BUCKET_LAYOUT = - BucketLayout.OBJECT_STORE; - private static final String SNAPSHOT_NAME_PREFIX = "snapshot"; + // This test depends on direct RocksDB checks that are easier done with OBS buckets. + private static final BucketLayout TEST_BUCKET_LAYOUT = BucketLayout.OBJECT_STORE; + private static final String SNAPSHOT_NAME_PREFIX = "snapshot-"; + private static final String KEY_NAME_PREFIX = "key-"; private OzoneClient client; + private final AtomicInteger counter = new AtomicInteger(); /** * Create a MiniOzoneCluster for testing. The cluster initially has one @@ -117,11 +115,12 @@ public void init(TestInfo testInfo) throws Exception { String clusterId = UUID.randomUUID().toString(); String scmId = UUID.randomUUID().toString(); String omServiceId = "om-service-test1"; + OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class); + omRatisConf.setLogAppenderWaitTimeMin(10); + conf.setFromObject(omRatisConf); conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); - conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, - StorageUnit.KB); - conf.setStorageSize(OMConfigKeys. - OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB); + conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB); + conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB); if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) { conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); @@ -176,12 +175,12 @@ public void init(TestInfo testInfo) throws Exception { client = OzoneClientFactory.getRpcClient(omServiceId, conf); objectStore = client.getObjectStore(); - volumeName = "volume" + RandomStringUtils.randomNumeric(5); - bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + volumeName = "volume" + counter.incrementAndGet(); + bucketName = "bucket" + counter.incrementAndGet(); VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() - .setOwner("user" + RandomStringUtils.randomNumeric(5)) - .setAdmin("admin" + RandomStringUtils.randomNumeric(5)) + .setOwner("user" + counter.incrementAndGet()) + .setAdmin("admin" + counter.incrementAndGet()) .build(); objectStore.createVolume(volumeName, createVolumeArgs); @@ -226,8 +225,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices() cluster.getOzoneManager(leaderOM.getOMNodeId()); assertEquals(leaderOM, newFollowerOM); - SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM, - SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5)); + SnapshotInfo newSnapshot = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()); /* Check whether newly created key data is reclaimed @@ -252,8 +250,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices() assertNotNull(keyInfoA); // create snapshot b - SnapshotInfo snapshotInfoB = createOzoneSnapshot(newLeaderOM, - SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5)); + SnapshotInfo snapshotInfoB = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()); assertNotNull(snapshotInfoB); // delete key a @@ -263,8 +260,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices() () -> !isKeyInTable(keyA, omKeyInfoTable)); // create snapshot c - SnapshotInfo snapshotInfoC = createOzoneSnapshot(newLeaderOM, - SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5)); + SnapshotInfo snapshotInfoC = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()); // get snapshot c OmSnapshot snapC; @@ -281,8 +277,7 @@ public void testSnapshotAndKeyDeletionBackgroundServices() () -> isKeyInTable(keyA, snapC.getMetadataManager().getDeletedTable())); // create snapshot d - SnapshotInfo snapshotInfoD = createOzoneSnapshot(newLeaderOM, - SNAPSHOT_NAME_PREFIX + RandomStringUtils.randomNumeric(5)); + SnapshotInfo snapshotInfoD = createOzoneSnapshot(newLeaderOM, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()); // delete snapshot c client.getObjectStore() @@ -535,18 +530,14 @@ public void testSSTFilteringBackgroundService() private void confirmSnapDiffForTwoSnapshotsDifferingBySingleKey( OzoneManager ozoneManager) throws IOException, InterruptedException, TimeoutException { - String firstSnapshot = createOzoneSnapshot(ozoneManager, - TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX + - RandomStringUtils.randomNumeric(10)).getName(); + String firstSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()) + .getName(); String diffKey = writeKeys(1).get(0); - String secondSnapshot = createOzoneSnapshot(ozoneManager, - TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX + - RandomStringUtils.randomNumeric(10)).getName(); - SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName, - firstSnapshot, secondSnapshot); + String secondSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()) + .getName(); + SnapshotDiffReportOzone diff = getSnapDiffReport(volumeName, bucketName, firstSnapshot, secondSnapshot); assertEquals(Collections.singletonList( - SnapshotDiffReportOzone.getDiffReportEntry( - SnapshotDiffReport.DiffType.CREATE, diffKey, null)), + SnapshotDiffReportOzone.getDiffReportEntry(SnapshotDiffReport.DiffType.CREATE, diffKey, null)), diff.getDiffList()); } @@ -576,9 +567,7 @@ private static File getSstBackupDir(OzoneManager ozoneManager) { private void checkIfSnapshotGetsProcessedBySFS(OzoneManager ozoneManager) throws IOException, TimeoutException, InterruptedException { writeKeys(1); - SnapshotInfo newSnapshot = createOzoneSnapshot(ozoneManager, - TestSnapshotBackgroundServices.SNAPSHOT_NAME_PREFIX + - RandomStringUtils.randomNumeric(5)); + SnapshotInfo newSnapshot = createOzoneSnapshot(ozoneManager, SNAPSHOT_NAME_PREFIX + counter.incrementAndGet()); assertNotNull(newSnapshot); Table snapshotInfoTable = ozoneManager.getMetadataManager().getSnapshotInfoTable(); @@ -642,22 +631,17 @@ private SnapshotDiffReportOzone getSnapDiffReport(String volume, return response.get().getSnapshotDiffReport(); } - private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name) - throws IOException { + private SnapshotInfo createOzoneSnapshot(OzoneManager leaderOM, String name) throws IOException { objectStore.createSnapshot(volumeName, bucketName, name); - String tableKey = SnapshotInfo.getTableKey(volumeName, - bucketName, - name); + String tableKey = SnapshotInfo.getTableKey(volumeName, bucketName, name); SnapshotInfo snapshotInfo = leaderOM.getMetadataManager() .getSnapshotInfoTable() .get(tableKey); // Allow the snapshot to be written to disk - String fileName = - getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo); + String fileName = getSnapshotPath(leaderOM.getConfiguration(), snapshotInfo); File snapshotDir = new File(fileName); - if (!RDBCheckpointUtils - .waitForCheckpointDirectoryExist(snapshotDir)) { + if (!RDBCheckpointUtils.waitForCheckpointDirectoryExist(snapshotDir)) { throw new IOException("snapshot directory doesn't exist"); } return snapshotInfo; @@ -667,7 +651,9 @@ private List writeKeys(long keyCount) throws IOException { List keys = new ArrayList<>(); long index = 0; while (index < keyCount) { - keys.add(createKey(ozoneBucket)); + String key = KEY_NAME_PREFIX + counter.incrementAndGet(); + createKey(ozoneBucket, key); + keys.add(key); index++; } return keys; @@ -681,5 +667,4 @@ private void readKeys(List keys) throws IOException { inputStream.close(); } } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index 2a7771fe60a3..c39e823d3e55 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -241,9 +241,9 @@ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) long startTime = System.currentTimeMillis(); long pauseCounter = PAUSE_COUNTER.incrementAndGet(); - // Pause compactions, Copy/link files and get checkpoint. try { LOG.info("Compaction pausing {} started.", pauseCounter); + // Pause compactions, Copy/link files and get checkpoint. differ.incrementTarballRequestCount(); FileUtils.copyDirectory(compactionLogDir.getOriginalDir(), compactionLogDir.getTmpDir()); @@ -252,13 +252,9 @@ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) checkpoint = getDbStore().getCheckpoint(flush); } finally { // Unpause the compaction threads. - synchronized (getDbStore().getRocksDBCheckpointDiffer()) { - differ.decrementTarballRequestCount(); - differ.notifyAll(); - long elapsedTime = System.currentTimeMillis() - startTime; - LOG.info("Compaction pausing {} ended. Elapsed ms: {}", - pauseCounter, elapsedTime); - } + differ.decrementTarballRequestCountAndNotify(); + long elapsedTime = System.currentTimeMillis() - startTime; + LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter, elapsedTime); } return checkpoint; }