diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java index 1933925384fc..3364939f8bcf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java @@ -574,7 +574,6 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() DirectoryDeletingService dirDeletingService = cluster.getOzoneManager().getKeyManager().getDirDeletingService(); // Suspend KeyDeletingService dirDeletingService.suspend(); - GenericTestUtils.waitFor(() -> !dirDeletingService.isRunningOnAOS(), 1000, 10000); Random random = new Random(); final String testVolumeName = "volume" + random.nextInt(); final String testBucketName = "bucket" + random.nextInt(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java similarity index 60% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java index 48d1c2f978b3..080844d1f5f7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingServiceIntegrationTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.om.snapshot; +package org.apache.hadoop.ozone.om.service; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; @@ -23,35 +23,34 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; -import java.util.Collections; +import java.util.Arrays; import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Random; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.IOUtils; @@ -63,12 +62,11 @@ import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.om.KeyManager; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; -import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -76,22 +74,25 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; -import org.apache.hadoop.ozone.om.service.KeyDeletingService; -import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; +import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; +import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.tag.Flaky; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestMethodOrder; -import org.mockito.Mockito; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.MockedConstruction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,6 +162,10 @@ public void closeAllSnapshots() { while (!rcSnaps.isEmpty()) { rcSnaps.pop().close(); } + // Resume services + om.getKeyManager().getDirDeletingService().resume(); + om.getKeyManager().getDeletingService().resume(); + om.getKeyManager().getSnapshotDeletingService().resume(); } private UncheckedAutoCloseableSupplier getOmSnapshot(String volume, String bucket, String snapshotName) @@ -532,233 +537,6 @@ public void testSnapshotWithFSO() throws Exception { snap1 = null; } - private DirectoryDeletingService getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted, - AtomicBoolean dirDeletionStarted) - throws InterruptedException, TimeoutException, IOException { - OzoneManager ozoneManager = Mockito.spy(om); - om.getKeyManager().getDirDeletingService().shutdown(); - KeyManager keyManager = Mockito.spy(om.getKeyManager()); - when(ozoneManager.getKeyManager()).thenReturn(keyManager); - GenericTestUtils.waitFor(() -> om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000, - 100000); - DirectoryDeletingService directoryDeletingService = Mockito.spy(new DirectoryDeletingService(10000, - TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1, false)); - directoryDeletingService.shutdown(); - GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount() == 0, 1000, - 100000); - doAnswer(i -> { - // Wait for SDS to reach DDS wait block before processing any deleted directories. - GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000); - dirDeletionStarted.set(true); - return i.callRealMethod(); - }).when(keyManager).getDeletedDirEntries(); - return directoryDeletingService; - } - - private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean keyDeletionWaitStarted, - AtomicBoolean keyDeletionStarted) - throws InterruptedException, TimeoutException, IOException { - OzoneManager ozoneManager = Mockito.spy(om); - om.getKeyManager().getDeletingService().shutdown(); - GenericTestUtils.waitFor(() -> om.getKeyManager().getDeletingService().getThreadCount() == 0, 1000, - 100000); - KeyManager keyManager = Mockito.spy(om.getKeyManager()); - when(ozoneManager.getKeyManager()).thenReturn(keyManager); - KeyDeletingService keyDeletingService = Mockito.spy(new KeyDeletingService(ozoneManager, - ozoneManager.getScmClient().getBlockClient(), 10000, - 100000, cluster.getConf(), 10, false)); - keyDeletingService.shutdown(); - GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 1000, - 100000); - when(keyManager.getPendingDeletionKeys(any(), anyInt())).thenAnswer(i -> { - // wait for SDS to reach the KDS wait block before processing any key. - GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000); - keyDeletionStarted.set(true); - return i.callRealMethod(); - }); - return keyDeletingService; - } - - @SuppressWarnings("checkstyle:parameternumber") - private SnapshotDeletingService getMockedSnapshotDeletingService(KeyDeletingService keyDeletingService, - DirectoryDeletingService directoryDeletingService, - AtomicBoolean snapshotDeletionStarted, - AtomicBoolean keyDeletionWaitStarted, - AtomicBoolean dirDeletionWaitStarted, - AtomicBoolean keyDeletionStarted, - AtomicBoolean dirDeletionStarted, - OzoneBucket testBucket) - throws InterruptedException, TimeoutException, IOException { - OzoneManager ozoneManager = Mockito.spy(om); - om.getKeyManager().getSnapshotDeletingService().shutdown(); - GenericTestUtils.waitFor(() -> om.getKeyManager().getSnapshotDeletingService().getThreadCount() == 0, 1000, - 100000); - KeyManager keyManager = Mockito.spy(om.getKeyManager()); - OmMetadataManagerImpl omMetadataManager = Mockito.spy((OmMetadataManagerImpl)om.getMetadataManager()); - SnapshotChainManager unMockedSnapshotChainManager = - ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(); - SnapshotChainManager snapshotChainManager = Mockito.spy(unMockedSnapshotChainManager); - OmSnapshotManager omSnapshotManager = Mockito.spy(om.getOmSnapshotManager()); - when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); - when(ozoneManager.getKeyManager()).thenReturn(keyManager); - when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); - when(omMetadataManager.getSnapshotChainManager()).thenReturn(snapshotChainManager); - when(keyManager.getDeletingService()).thenReturn(keyDeletingService); - when(keyManager.getDirDeletingService()).thenReturn(directoryDeletingService); - SnapshotDeletingService snapshotDeletingService = Mockito.spy(new SnapshotDeletingService(10000, - 100000, ozoneManager)); - snapshotDeletingService.shutdown(); - GenericTestUtils.waitFor(() -> snapshotDeletingService.getThreadCount() == 0, 1000, - 100000); - when(snapshotChainManager.iterator(anyBoolean())).thenAnswer(i -> { - Iterator itr = (Iterator) i.callRealMethod(); - return Lists.newArrayList(itr).stream().filter(uuid -> { - try { - SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om, snapshotChainManager, uuid); - return snapshotInfo.getBucketName().equals(testBucket.getName()) && - snapshotInfo.getVolumeName().equals(testBucket.getVolumeName()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).iterator(); - }); - when(snapshotChainManager.getLatestGlobalSnapshotId()) - .thenAnswer(i -> unMockedSnapshotChainManager.getLatestGlobalSnapshotId()); - when(snapshotChainManager.getOldestGlobalSnapshotId()) - .thenAnswer(i -> unMockedSnapshotChainManager.getOldestGlobalSnapshotId()); - doAnswer(i -> { - // KDS wait block reached in SDS. - GenericTestUtils.waitFor(() -> { - return keyDeletingService.isRunningOnAOS(); - }, 1000, 100000); - keyDeletionWaitStarted.set(true); - return i.callRealMethod(); - }).when(snapshotDeletingService).waitForKeyDeletingService(); - doAnswer(i -> { - // DDS wait block reached in SDS. - GenericTestUtils.waitFor(directoryDeletingService::isRunningOnAOS, 1000, 100000); - dirDeletionWaitStarted.set(true); - return i.callRealMethod(); - }).when(snapshotDeletingService).waitForDirDeletingService(); - doAnswer(i -> { - // Assert KDS & DDS is not running when SDS starts moving entries & assert all wait block, KDS processing - // AOS block & DDS AOS block have been executed. - Assertions.assertTrue(keyDeletionWaitStarted.get()); - Assertions.assertTrue(dirDeletionWaitStarted.get()); - Assertions.assertTrue(keyDeletionStarted.get()); - Assertions.assertTrue(dirDeletionStarted.get()); - Assertions.assertFalse(keyDeletingService.isRunningOnAOS()); - Assertions.assertFalse(directoryDeletingService.isRunningOnAOS()); - snapshotDeletionStarted.set(true); - return i.callRealMethod(); - }).when(omSnapshotManager).getSnapshot(anyString(), anyString(), anyString()); - return snapshotDeletingService; - } - - @Test - @Order(4) - @Flaky("HDDS-11847") - public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws Exception { - AtomicBoolean keyDeletionWaitStarted = new AtomicBoolean(false); - AtomicBoolean dirDeletionWaitStarted = new AtomicBoolean(false); - AtomicBoolean keyDeletionStarted = new AtomicBoolean(false); - AtomicBoolean dirDeletionStarted = new AtomicBoolean(false); - AtomicBoolean snapshotDeletionStarted = new AtomicBoolean(false); - Random random = new Random(); - String bucketName = "bucket" + random.nextInt(); - BucketArgs bucketArgs = new BucketArgs.Builder() - .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED) - .build(); - OzoneBucket testBucket = TestDataUtil.createBucket( - client, VOLUME_NAME, bucketArgs, bucketName); - // mock keyDeletingService - KeyDeletingService keyDeletingService = getMockedKeyDeletingService(keyDeletionWaitStarted, keyDeletionStarted); - - // mock dirDeletingService - DirectoryDeletingService directoryDeletingService = getMockedDirectoryDeletingService(dirDeletionWaitStarted, - dirDeletionStarted); - - // mock snapshotDeletingService. - SnapshotDeletingService snapshotDeletingService = getMockedSnapshotDeletingService(keyDeletingService, - directoryDeletingService, snapshotDeletionStarted, keyDeletionWaitStarted, dirDeletionWaitStarted, - keyDeletionStarted, dirDeletionStarted, testBucket); - createSnapshotFSODataForBucket(testBucket); - List> renamesKeyEntries; - List>> deletedKeyEntries; - List> deletedDirEntries; - try (UncheckedAutoCloseableSupplier snapshot = - om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(), testBucket.getName(), - testBucket.getName() + "snap2")) { - renamesKeyEntries = snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000); - deletedKeyEntries = snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000); - deletedDirEntries = snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), - testBucket.getName(), 1000); - } - Thread keyDeletingThread = new Thread(() -> { - try { - keyDeletingService.runPeriodicalTaskNow(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - Thread directoryDeletingThread = new Thread(() -> { - try { - directoryDeletingService.runPeriodicalTaskNow(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - ExecutorService snapshotDeletingThread = Executors.newFixedThreadPool(1); - Runnable snapshotDeletionRunnable = () -> { - try { - snapshotDeletingService.runPeriodicalTaskNow(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - keyDeletingThread.start(); - directoryDeletingThread.start(); - Future future = snapshotDeletingThread.submit(snapshotDeletionRunnable); - GenericTestUtils.waitFor(snapshotDeletionStarted::get, 1000, 30000); - future.get(); - try (UncheckedAutoCloseableSupplier snapshot = - om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(), testBucket.getName(), - testBucket.getName() + "snap2")) { - Assertions.assertEquals(Collections.emptyList(), - snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000)); - Assertions.assertEquals(Collections.emptyList(), - snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000)); - Assertions.assertEquals(Collections.emptyList(), - snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), - testBucket.getName(), 1000)); - } - List> aosRenamesKeyEntries = - om.getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000); - List>> aosDeletedKeyEntries = - om.getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", (kv) -> true, 1000); - List> aosDeletedDirEntries = - om.getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), - testBucket.getName(), 1000); - renamesKeyEntries.forEach(entry -> Assertions.assertTrue(aosRenamesKeyEntries.contains(entry))); - deletedKeyEntries.forEach(entry -> Assertions.assertTrue(aosDeletedKeyEntries.contains(entry))); - deletedDirEntries.forEach(entry -> Assertions.assertTrue(aosDeletedDirEntries.contains(entry))); - Mockito.reset(snapshotDeletingService); - SnapshotInfo snap2 = SnapshotUtils.getSnapshotInfo(om, testBucket.getVolumeName(), - testBucket.getName(), testBucket.getName() + "snap2"); - Assertions.assertEquals(snap2.getSnapshotStatus(), SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED); - future = snapshotDeletingThread.submit(snapshotDeletionRunnable); - future.get(); - Assertions.assertThrows(IOException.class, () -> SnapshotUtils.getSnapshotInfo(om, testBucket.getVolumeName(), - testBucket.getName(), testBucket.getName() + "snap2")); - cluster.restartOzoneManager(); - } - /* Flow ---- @@ -846,74 +624,166 @@ private synchronized void createSnapshotDataForBucket(OzoneBucket bucket) throws bucket.getName())); } - /* - Flow - ---- - create dir0/key0 - create dir1/key1 - overwrite dir0/key0 - create dir2/key2 - create snap1 - rename dir1/key1 -> dir1/key10 - delete dir1/key10 - delete dir2 - create snap2 - delete snap2 - */ - private synchronized void createSnapshotFSODataForBucket(OzoneBucket bucket) throws Exception { - Table snapshotInfoTable = - om.getMetadataManager().getSnapshotInfoTable(); - Table deletedTable = - om.getMetadataManager().getDeletedTable(); - Table deletedDirTable = - om.getMetadataManager().getDeletedDirTable(); - Table keyTable = - om.getMetadataManager().getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED); - Table dirTable = - om.getMetadataManager().getDirectoryTable(); - Table renameTable = om.getMetadataManager().getSnapshotRenamedTable(); - OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl) - om.getMetadataManager(); - Map countMap = - metadataManager.listTables().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> { - try { - return (int)metadataManager.countRowsInTable(e.getValue()); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - })); - TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0", CONTENT.array()); - TestDataUtil.createKey(bucket, "dir1/" + bucket.getName() + "key1", CONTENT.array()); - assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 2); - assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 2); + private MockedConstruction getMockedReclaimableKeyFilter(String volume, String bucket, + AtomicBoolean kdsWaitStarted, AtomicBoolean sdsLockWaitStarted, + AtomicBoolean sdsLockAcquired, AtomicBoolean kdsFinished, ReclaimableKeyFilter keyFilter) throws IOException { + + return mockConstruction(ReclaimableKeyFilter.class, + (mocked, context) -> { + when(mocked.apply(any())).thenAnswer(i -> { + Table.KeyValue keyInfo = i.getArgument(0); + if (!keyInfo.getValue().getVolumeName().equals(volume) || + !keyInfo.getValue().getBucketName().equals(bucket)) { + return keyFilter.apply(i.getArgument(0)); + } + keyFilter.apply(i.getArgument(0)); + //Notify SDS that Kds has started for the bucket. + kdsWaitStarted.set(true); + GenericTestUtils.waitFor(sdsLockWaitStarted::get, 1000, 10000); + // Wait for 1 more second so that the command moves to lock wait. + Thread.sleep(1000); + return keyFilter.apply(i.getArgument(0)); + }); + doAnswer(i -> { + assertTrue(sdsLockWaitStarted.get()); + assertFalse(sdsLockAcquired.get()); + kdsFinished.set(true); + keyFilter.close(); + return null; + }).when(mocked).close(); + when(mocked.getExclusiveReplicatedSizeMap()).thenAnswer(i -> keyFilter.getExclusiveReplicatedSizeMap()); + when(mocked.getExclusiveSizeMap()).thenAnswer(i -> keyFilter.getExclusiveSizeMap()); + }); + } - // Overwrite bucket1key0, This is a newer version of the key which should - // reclaimed as this is a different version of the key. - TestDataUtil.createKey(bucket, "dir0/" + bucket.getName() + "key0", CONTENT.array()); - TestDataUtil.createKey(bucket, "dir2/" + bucket.getName() + "key2", CONTENT.array()); - assertTableRowCount(keyTable, countMap.get(keyTable.getName()) + 3); - assertTableRowCount(dirTable, countMap.get(dirTable.getName()) + 3); - assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) + 1); - // create snap1 - client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(), - bucket.getName() + "snap1"); - bucket.renameKey("dir1/" + bucket.getName() + "key1", "dir1/" + bucket.getName() + "key10"); - bucket.renameKey("dir1/", "dir10/"); - assertTableRowCount(renameTable, countMap.get(renameTable.getName()) + 2); - client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(), - "dir10/" + bucket.getName() + "key10", false); - assertTableRowCount(deletedTable, countMap.get(deletedTable.getName()) + 1); - // Key 2 is deleted here, which will be reclaimed here as - // it is not being referenced by previous snapshot. - client.getProxy().deleteKey(bucket.getVolumeName(), bucket.getName(), "dir2", true); - assertTableRowCount(deletedDirTable, countMap.get(deletedDirTable.getName()) + 1); - client.getProxy().createSnapshot(bucket.getVolumeName(), bucket.getName(), - bucket.getName() + "snap2"); - // Delete Snapshot 2. - client.getProxy().deleteSnapshot(bucket.getVolumeName(), bucket.getName(), - bucket.getName() + "snap2"); - assertTableRowCount(snapshotInfoTable, countMap.get(snapshotInfoTable.getName()) + 2); + @ParameterizedTest + @CsvSource({"true, 0", "true, 1", "false, 0", "false, 1", "false, 2"}) + @DisplayName("Tests Snapshot Deleting Service while KeyDeletingService is already running.") + @Order(4) + public void testSnapshotDeletingServiceWaitsForKeyDeletingService(boolean kdsRunningOnAOS, + int snasphotDeleteIndex) throws Exception { + SnapshotChainManager snapshotChainManager = + ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(); + GenericTestUtils.waitFor(() -> { + try { + Iterator itr = snapshotChainManager.iterator(false); + while (itr.hasNext()) { + SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(om, snapshotChainManager, itr.next()); + assertEquals(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE, snapshotInfo.getSnapshotStatus()); + } + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 1000, 30000); + om.awaitDoubleBufferFlush(); + // Suspend the services first + om.getKeyManager().getDirDeletingService().suspend(); + om.getKeyManager().getDeletingService().suspend(); + om.getKeyManager().getSnapshotDeletingService().suspend(); + String volume = "vol" + RandomStringUtils.secure().nextNumeric(3), + bucket = "bucket" + RandomStringUtils.secure().nextNumeric(3); + client.getObjectStore().createVolume(volume); + OzoneVolume ozoneVolume = client.getObjectStore().getVolume(volume); + ozoneVolume.createBucket(bucket); + OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucket); + + // Create snap0 + client.getObjectStore().createSnapshot(volume, bucket, "snap0"); + client.getObjectStore().getSnapshotInfo(volume, bucket, "snap0"); + UUID snap1Id = client.getObjectStore().getSnapshotInfo(volume, bucket, "snap0").getSnapshotId(); + + // Create snap1 + TestDataUtil.createKey(ozoneBucket, "key", CONTENT.array()); + client.getObjectStore().createSnapshot(volume, bucket, "snap1"); + UUID snap2Id = client.getObjectStore().getSnapshotInfo(volume, bucket, "snap1").getSnapshotId(); + + ozoneBucket.renameKey("key", "renamedKey"); + ozoneBucket.deleteKey("renamedKey"); + om.awaitDoubleBufferFlush(); + UUID snap3Id; + ReclaimableKeyFilter keyFilter; + SnapshotInfo snapInfo; + // Create snap3 to test snapshot 3 deep cleaning otherwise just run on AOS. + if (kdsRunningOnAOS) { + snap3Id = null; + snapInfo = null; + keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), + ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), + snapInfo, om.getKeyManager(), om.getMetadataManager().getLock()); + } else { + + client.getObjectStore().createSnapshot(volume, bucket, "snap2"); + snap3Id = client.getObjectStore().getSnapshotInfo(volume, bucket, "snap2").getSnapshotId(); + om.awaitDoubleBufferFlush(); + SnapshotInfo snap = om.getMetadataManager().getSnapshotInfo(volume, bucket, "snap2"); + snap.setDeepCleanedDeletedDir(true); + om.getMetadataManager().getSnapshotInfoTable().put(snap.getTableKey(), snap); + assertTrue(om.getMetadataManager().getSnapshotInfo(volume, bucket, "snap2") + .isDeepCleanedDeletedDir()); + snapInfo = SnapshotUtils.getSnapshotInfo(om, volume, bucket, "snap2"); + keyFilter = new ReclaimableKeyFilter(om, om.getOmSnapshotManager(), + ((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(), + snapInfo, getOmSnapshot(volume, bucket, "snap2").get().getKeyManager(), + om.getMetadataManager().getLock()); + } + + + MultiSnapshotLocks sdsMultiLocks = new MultiSnapshotLocks(cluster.getOzoneManager().getMetadataManager().getLock(), + SNAPSHOT_GC_LOCK, true); + AtomicBoolean kdsWaitStarted = new AtomicBoolean(false); + AtomicBoolean kdsFinished = new AtomicBoolean(false); + AtomicBoolean sdsLockWaitStarted = new AtomicBoolean(false); + AtomicBoolean sdsLockAcquired = new AtomicBoolean(false); + + try (MockedConstruction mockedMultiSnapshotLock = mockConstruction(MultiSnapshotLocks.class, + (mocked, context) -> { + when(mocked.acquireLock(anyList())).thenAnswer(i -> { + List ids = i.getArgument(0); + List expectedIds = Arrays.asList(snap1Id, snap2Id, snap3Id).subList(snasphotDeleteIndex, Math.min(3, + snasphotDeleteIndex + 2)).stream().filter(Objects::nonNull).collect(Collectors.toList()); + if (expectedIds.equals(ids) && !sdsLockWaitStarted.get() && !sdsLockAcquired.get()) { + sdsLockWaitStarted.set(true); + OMLockDetails lockDetails = sdsMultiLocks.acquireLock(ids); + assertTrue(kdsFinished::get); + sdsLockAcquired.set(true); + return lockDetails; + } + return sdsMultiLocks.acquireLock(ids); + }); + doAnswer(i -> { + sdsMultiLocks.releaseLock(); + return null; + }).when(mocked).releaseLock(); + })) { + KeyDeletingService kds = new KeyDeletingService(om, om.getScmClient().getBlockClient(), 500, 10000, + om.getConfiguration(), 1, true); + kds.shutdown(); + KeyDeletingService.KeyDeletingTask task = kds.new KeyDeletingTask(snap3Id); + + CompletableFuture.supplyAsync(() -> { + try (MockedConstruction mockedReclaimableFilter = getMockedReclaimableKeyFilter( + volume, bucket, kdsWaitStarted, sdsLockWaitStarted, sdsLockAcquired, kdsFinished, keyFilter)) { + return task.call(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + SnapshotDeletingService sds = new SnapshotDeletingService(500, 10000, om); + sds.shutdown(); + GenericTestUtils.waitFor(kdsWaitStarted::get, 1000, 30000); + client.getObjectStore().deleteSnapshot(volume, bucket, "snap" + snasphotDeleteIndex); + sds.runPeriodicalTaskNow(); + om.awaitDoubleBufferFlush(); + if (snasphotDeleteIndex == 2) { + sds.runPeriodicalTaskNow(); + } + assertTrue(sdsLockWaitStarted.get()); + assertTrue(sdsLockAcquired.get()); + assertThrows(IOException.class, () -> SnapshotUtils.getSnapshotInfo(om, volume, bucket, + "snap" + snasphotDeleteIndex)); + } } private void verifySnapshotChain(SnapshotInfo deletedSnapshot, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index f96099323d54..28668eb68662 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -39,7 +39,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -149,7 +148,6 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { // from parent directory info from deleted directory table concurrently // and send deletion requests. private int ratisByteLimit; - private final AtomicBoolean isRunningOnAOS; private final SnapshotChainManager snapshotChainManager; private final boolean deepCleanSnapshots; private final ExecutorService deletionThreadPool; @@ -173,7 +171,6 @@ public DirectoryDeletingService(long interval, TimeUnit unit, // always go to 90% of max limit for request as other header will be added this.ratisByteLimit = (int) (limit * 0.9); - this.isRunningOnAOS = new AtomicBoolean(false); registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration); this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.deepCleanSnapshots = deepCleanSnapshots; @@ -200,14 +197,10 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) { start(); } - public boolean isRunningOnAOS() { - return isRunningOnAOS.get(); - } - @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new DirDeletingTask(this, null)); + queue.add(new DirDeletingTask(null)); if (deepCleanSnapshots) { Iterator iterator = null; try { @@ -218,7 +211,7 @@ public BackgroundTaskQueue getTasks() { } while (iterator.hasNext()) { UUID snapshotId = iterator.next(); - queue.add(new DirDeletingTask(this, snapshotId)); + queue.add(new DirDeletingTask(snapshotId)); } } return queue; @@ -468,11 +461,9 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List= 0, OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative."); this.deletedKeyCount = new AtomicLong(0); - this.isRunningOnAOS = new AtomicBoolean(false); this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.scmClient = scmClient; @@ -119,10 +116,6 @@ public AtomicLong getDeletedKeyCount() { return deletedKeyCount; } - public boolean isRunningOnAOS() { - return isRunningOnAOS.get(); - } - Pair processKeyDeletes(List keyBlocksList, Map keysToModify, List renameEntries, String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { @@ -261,7 +254,7 @@ private Pair submitPurgeKeysRequest(List iterator = null; try { @@ -272,7 +265,7 @@ public BackgroundTaskQueue getTasks() { } while (iterator.hasNext()) { UUID snapshotId = iterator.next(); - queue.add(new KeyDeletingTask(this, snapshotId)); + queue.add(new KeyDeletingTask(snapshotId)); } } return queue; @@ -295,11 +288,9 @@ public void setKeyLimitPerTask(int keyLimitPerTask) { */ @VisibleForTesting final class KeyDeletingTask implements BackgroundTask { - private final KeyDeletingService deletingService; private final UUID snapshotId; - KeyDeletingTask(KeyDeletingService service, UUID snapshotId) { - this.deletingService = service; + KeyDeletingTask(UUID snapshotId) { this.snapshotId = snapshotId; } @@ -427,7 +418,6 @@ public BackgroundTaskResult call() { final long run = getRunCount().incrementAndGet(); if (snapshotId == null) { LOG.debug("Running KeyDeletingService for active object store, {}", run); - isRunningOnAOS.set(true); } else { LOG.debug("Running KeyDeletingService for snapshot : {}, {}", snapshotId, run); } @@ -464,13 +454,6 @@ public BackgroundTaskResult call() { } catch (IOException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", snapInfo, e); - } finally { - if (snapshotId == null) { - isRunningOnAOS.set(false); - synchronized (deletingService) { - this.deletingService.notify(); - } - } } } // By design, no one cares about the results of this call back. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 96ae98a19b6b..75e9a20cdf12 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; @@ -52,6 +53,8 @@ import org.apache.hadoop.ozone.om.SnapshotChainManager; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; @@ -86,7 +89,8 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService { private final int keyLimitPerTask; private final int snapshotDeletionPerTask; private final int ratisByteLimit; - private final long serviceTimeout; + private final MultiSnapshotLocks snapshotIdLocks; + private final List lockIds; public SnapshotDeletingService(long interval, long serviceTimeout, OzoneManager ozoneManager) @@ -112,32 +116,9 @@ public SnapshotDeletingService(long interval, long serviceTimeout, this.keyLimitPerTask = conf.getInt( OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK, OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); - this.serviceTimeout = serviceTimeout; - } - - // Wait for a notification from KeyDeletingService if the key deletion is running. This is to ensure, merging of - // entries do not start while the AOS is still processing the deleted keys. - @VisibleForTesting - public void waitForKeyDeletingService() throws InterruptedException { - KeyDeletingService keyDeletingService = getOzoneManager().getKeyManager().getDeletingService(); - synchronized (keyDeletingService) { - while (keyDeletingService.isRunningOnAOS()) { - keyDeletingService.wait(serviceTimeout); - } - } - } - - // Wait for a notification from DirectoryDeletingService if the directory deletion is running. This is to ensure, - // merging of entries do not start while the AOS is still processing the deleted keys. - @VisibleForTesting - public void waitForDirDeletingService() throws InterruptedException { - DirectoryDeletingService directoryDeletingService = getOzoneManager().getKeyManager() - .getDirDeletingService(); - synchronized (directoryDeletingService) { - while (directoryDeletingService.isRunningOnAOS()) { - directoryDeletingService.wait(serviceTimeout); - } - } + IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); + this.snapshotIdLocks = new MultiSnapshotLocks(lock, SNAPSHOT_GC_LOCK, true); + this.lockIds = new ArrayList<>(2); } private class SnapshotDeletingTask implements BackgroundTask { @@ -173,16 +154,22 @@ public BackgroundTaskResult call() throws InterruptedException { snapInfo.getTableKey()); continue; } - // nextSnapshot = null means entries would be moved to AOS. if (nextSnapshot == null) { LOG.info("Snapshot: {} entries will be moved to AOS.", snapInfo.getTableKey()); - waitForKeyDeletingService(); - waitForDirDeletingService(); } else { LOG.info("Snapshot: {} entries will be moved to next active snapshot: {}", snapInfo.getTableKey(), nextSnapshot.getTableKey()); } + lockIds.clear(); + lockIds.add(snapInfo.getSnapshotId()); + if (nextSnapshot != null) { + lockIds.add(nextSnapshot.getSnapshotId()); + } + // Acquire write lock on current snapshot and next snapshot in chain. + if (!snapshotIdLocks.acquireLock(lockIds).isLockAcquired()) { + continue; + } try (UncheckedAutoCloseableSupplier snapshot = omSnapshotManager.getSnapshot( snapInfo.getVolumeName(), snapInfo.getBucketName(), snapInfo.getName())) { KeyManager snapshotKeyManager = snapshot.get().getKeyManager(); @@ -229,6 +216,8 @@ public BackgroundTaskResult call() throws InterruptedException { } else { snapshotsToBePurged.add(snapInfo.getTableKey()); } + } finally { + snapshotIdLocks.releaseLock(); } successRunCount.incrementAndGet(); snapshotLimit--; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index b3178580b5c9..674c78d16aec 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -347,7 +347,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() keyDeletingService.suspend(); SnapshotDeletingService snapshotDeletingService = om.getKeyManager().getSnapshotDeletingService(); snapshotDeletingService.suspend(); - GenericTestUtils.waitFor(() -> !keyDeletingService.isRunningOnAOS(), 1000, 10000); + final String volumeName = getTestName(); final String bucketName = uniqueObjectName("bucket"); OzoneManager ozoneManager = Mockito.spy(om); @@ -618,7 +618,7 @@ public void testKeyDeletingServiceWithDeepCleanedSnapshots() throws Exception { when(kds.getTasks()).thenAnswer(i -> { BackgroundTaskQueue queue = new BackgroundTaskQueue(); for (UUID id : snapshotIds) { - queue.add(kds.new KeyDeletingTask(kds, id)); + queue.add(kds.new KeyDeletingTask(id)); } return queue; });