diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java new file mode 100644 index 000000000000..b1f04bd4c32b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.lock; + +import java.util.concurrent.Semaphore; + +/** Bootstrap state handler interface. */ +public interface BootstrapStateHandler { + Lock getBootstrapStateLock(); + + /** Bootstrap state handler lock implementation. */ + class Lock implements AutoCloseable { + private final Semaphore semaphore = new Semaphore(1); + public Lock lock() throws InterruptedException { + semaphore.acquire(); + return this; + } + public void unlock() { + semaphore.release(); + } + public void close() { + unlock(); + } + } +} diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 2bea23e9a7fb..c17e4008ad28 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -67,6 +67,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; @@ -96,7 +97,8 @@ * It is important to note that compaction log is per-DB instance. Since * each OM DB instance might trigger compactions at different timings. */ -public class RocksDBCheckpointDiffer implements AutoCloseable { +public class RocksDBCheckpointDiffer implements AutoCloseable, + BootstrapStateHandler { private static final Logger LOG = LoggerFactory.getLogger(RocksDBCheckpointDiffer.class); @@ -171,6 +173,8 @@ public class RocksDBCheckpointDiffer implements AutoCloseable { private final ScheduledExecutorService executor; private boolean closed; private final long maxAllowedTimeInDag; + private final BootstrapStateHandler.Lock lock + = new BootstrapStateHandler.Lock(); private ColumnFamilyHandle snapshotInfoTableCFHandle; @@ -1160,14 +1164,18 @@ public void pruneOlderSnapshotsWithCompactionHistory() { Set sstFileNodesRemoved = pruneSstFileNodesFromDag(lastCompactionSstFiles); - removeSstFile(sstFileNodesRemoved); - deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths); + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + removeSstFiles(sstFileNodesRemoved); + deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** - * Deletes the SST file from the backup directory if exists. + * Deletes the SST files from the backup directory if exists. */ - private void removeSstFile(Set sstFileNodes) { + private void removeSstFiles(Set sstFileNodes) { for (String sstFileNode: sstFileNodes) { File file = new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION); @@ -1464,8 +1472,11 @@ public void pruneSstFiles() { .map(node -> node.getFileName()) .collect(Collectors.toSet()); } - - removeSstFile(nonLeafSstFiles); + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + removeSstFiles(nonLeafSstFiles); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @VisibleForTesting @@ -1508,4 +1519,9 @@ public static RocksDBCheckpointDiffer getInstance( configuration)); } } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java index 2bad80848131..a531aed0710a 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java @@ -36,13 +36,20 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.graph.MutableGraph; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; @@ -78,6 +85,7 @@ import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -107,6 +115,7 @@ public class TestRocksDBCheckpointDiffer { private File compactionLogDir; private File sstBackUpDir; private ConfigurationSource config; + private ExecutorService executorService = Executors.newCachedThreadPool(); @BeforeEach public void init() { @@ -1064,7 +1073,8 @@ public void testPruneOlderSnapshotsWithCompactionHistory( List compactionLogs, Set expectedNodes, int expectedNumberOfLogFilesDeleted - ) throws IOException { + ) throws IOException, ExecutionException, InterruptedException, + TimeoutException { List filesCreated = new ArrayList<>(); for (int i = 0; i < compactionLogs.size(); i++) { @@ -1085,7 +1095,8 @@ public void testPruneOlderSnapshotsWithCompactionHistory( differ.loadAllCompactionLogs(); - differ.pruneOlderSnapshotsWithCompactionHistory(); + waitForLock(differ, + RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory); Set actualNodesInForwardDAG = differ.getForwardCompactionDAG() .nodes() @@ -1114,6 +1125,29 @@ public void testPruneOlderSnapshotsWithCompactionHistory( } } + // Take the lock, confirm that the consumer doesn't finish + // then release the lock and confirm that the consumer does finish. + private void waitForLock(RocksDBCheckpointDiffer differ, + Consumer c) + throws InterruptedException, ExecutionException, TimeoutException { + + Future future; + // Take the lock and start the consumer. + try (BootstrapStateHandler.Lock lock = + differ.getBootstrapStateLock().lock()) { + future = executorService.submit( + () -> { + c.accept(differ); + return true; + }); + // Confirm that the consumer doesn't finish with lock taken. + assertThrows(TimeoutException.class, + () -> future.get(5000, TimeUnit.MILLISECONDS)); + } + // Confirm consumer finishes when unlocked. + assertTrue(future.get(1000, TimeUnit.MILLISECONDS)); + } + private static Stream sstFilePruningScenarios() { return Stream.of( Arguments.of("Case 1: No compaction.", @@ -1161,7 +1195,8 @@ public void testSstFilePruning( String compactionLog, List initialFiles, List expectedFiles - ) throws IOException { + ) throws IOException, ExecutionException, InterruptedException, + TimeoutException { createFileWithContext(metadataDirName + "/" + compactionLogDirName + "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX, compactionLog); @@ -1179,7 +1214,9 @@ public void testSstFilePruning( config); differ.loadAllCompactionLogs(); - differ.pruneSstFiles(); + + waitForLock(differ, RocksDBCheckpointDiffer::pruneSstFiles); + Set actualFileSetAfterPruning; try (Stream pathStream = Files.list( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 637b402acd6e..498a446be9cc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -41,6 +41,10 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -54,6 +58,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.security.UserGroupInformation; @@ -91,7 +96,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -108,6 +115,7 @@ public class TestOMDbCheckpointServlet { private HttpServletRequest requestMock = null; private HttpServletResponse responseMock = null; private OMDBCheckpointServlet omDbCheckpointServletMock = null; + private BootstrapStateHandler.Lock lock; private File metaDir; private String snapshotDirName; private String snapshotDirName2; @@ -173,6 +181,7 @@ private void setupCluster() throws Exception { omDbCheckpointServletMock = mock(OMDBCheckpointServlet.class); + lock = new OMDBCheckpointServlet.Lock(cluster.getOzoneManager()); doCallRealMethod().when(omDbCheckpointServletMock).init(); requestMock = mock(HttpServletRequest.class); @@ -195,6 +204,9 @@ private void setupCluster() throws Exception { doCallRealMethod().when(omDbCheckpointServletMock) .writeDbDataToStream(any(), any(), any(), any(), any()); + + when(omDbCheckpointServletMock.getBootstrapStateLock()) + .thenReturn(lock); } @Test @@ -596,4 +608,101 @@ private void checkLine(String shortSnapshotLocation, String file1 = files[1]; Assert.assertEquals("hl filenames are the same", file0, file1); } + + @Test + public void testBootstrapLocking() throws Exception { + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + + // Get the bootstrap state handlers + KeyManager keyManager = cluster.getOzoneManager().getKeyManager(); + BootstrapStateHandler keyDeletingService = + keyManager.getDeletingService(); + BootstrapStateHandler snapshotDeletingService = + keyManager.getSnapshotDeletingService(); + BootstrapStateHandler sstFilteringService = + keyManager.getSnapshotSstFilteringService(); + BootstrapStateHandler differ = + cluster.getOzoneManager().getMetadataManager() + .getStore().getRocksDBCheckpointDiffer(); + + ExecutorService executorService = Executors.newCachedThreadPool(); + + OMDBCheckpointServlet omDbCheckpointServlet = new OMDBCheckpointServlet(); + + OMDBCheckpointServlet spyServlet = spy(omDbCheckpointServlet); + ServletContext servletContext = mock(ServletContext.class); + when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) + .thenReturn(cluster.getOzoneManager()); + doReturn(servletContext).when(spyServlet).getServletContext(); + + spyServlet.init(); + + // Confirm the other handlers are locked out when the bootstrap + // servlet takes the lock. + try (BootstrapStateHandler.Lock lock = + spyServlet.getBootstrapStateLock().lock()) { + confirmServletLocksOutOtherHandler(keyDeletingService, executorService); + confirmServletLocksOutOtherHandler(snapshotDeletingService, + executorService); + confirmServletLocksOutOtherHandler(sstFilteringService, executorService); + confirmServletLocksOutOtherHandler(differ, executorService); + } + // Confirm the servlet is locked out when any of the other + // handlers takes the lock. + confirmOtherHandlerLocksOutServlet(keyDeletingService, spyServlet, + executorService); + confirmOtherHandlerLocksOutServlet(snapshotDeletingService, spyServlet, + executorService); + confirmOtherHandlerLocksOutServlet(sstFilteringService, spyServlet, + executorService); + confirmOtherHandlerLocksOutServlet(differ, spyServlet, + executorService); + + // Confirm that servlet takes the lock when none of the other + // handlers have it. + Future servletTest = checkLock(spyServlet, executorService); + Assert.assertTrue(servletTest.get(10000, TimeUnit.MILLISECONDS)); + + executorService.shutdownNow(); + + } + + // Confirms handler can't take look the servlet already has. Assumes + // the servlet has already taken the lock. + private void confirmServletLocksOutOtherHandler(BootstrapStateHandler handler, + ExecutorService executorService) { + Future test = checkLock(handler, executorService); + // Handler should fail to take the lock because the servlet has taken it. + Assert.assertThrows(TimeoutException.class, + () -> test.get(500, TimeUnit.MILLISECONDS)); + } + + // Confirms Servlet can't take lock when handler has it. + private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler handler, + BootstrapStateHandler servlet, ExecutorService executorService) + throws InterruptedException { + try (BootstrapStateHandler.Lock lock = + handler.getBootstrapStateLock().lock()) { + Future test = checkLock(servlet, executorService); + // Servlet should fail to lock when other handler has taken it. + Assert.assertThrows(TimeoutException.class, + () -> test.get(500, TimeUnit.MILLISECONDS)); + } + } + + // Confirm lock is available by having handler take and release it. + private Future checkLock(BootstrapStateHandler handler, + ExecutorService executorService) { + return executorService.submit(() -> { + try { + handler.getBootstrapStateLock().lock(); + handler.getBootstrapStateLock().unlock(); + return true; + } catch (InterruptedException e) { + } + return false; + }); + + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 14f7ca72d2f7..1ebe65ad2f55 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -28,6 +28,8 @@ import org.apache.hadoop.ozone.om.fs.OzoneManagerFS; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.service.KeyDeletingService; +import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; import java.io.IOException; import java.time.Duration; @@ -153,7 +155,7 @@ ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count, * Returns the instance of Deleting Service. * @return Background service. */ - BackgroundService getDeletingService(); + KeyDeletingService getDeletingService(); OmMultipartUploadList listMultipartUploads(String volumeName, @@ -246,11 +248,11 @@ List getPendingDeletionSubFiles(long volumeId, * Returns the instance of Snapshot SST Filtering service. * @return Background service. */ - BackgroundService getSnapshotSstFilteringService(); + SstFilteringService getSnapshotSstFilteringService(); /** * Returns the instance of Snapshot Deleting service. * @return Background service. */ - BackgroundService getSnapshotDeletingService(); + SnapshotDeletingService getSnapshotDeletingService(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 4d6a91e358f2..588f7a5de61d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -165,10 +165,10 @@ public class KeyManagerImpl implements KeyManager { private final OzoneBlockTokenSecretManager secretManager; private final boolean grpcBlockTokenEnabled; - private BackgroundService keyDeletingService; + private KeyDeletingService keyDeletingService; - private BackgroundService snapshotSstFilteringService; - private BackgroundService snapshotDeletingService; + private SstFilteringService snapshotSstFilteringService; + private SnapshotDeletingService snapshotDeletingService; private final KeyProviderCryptoExtension kmsProvider; private final boolean enableFileSystemPaths; @@ -630,7 +630,7 @@ public OMMetadataManager getMetadataManager() { } @Override - public BackgroundService getDeletingService() { + public KeyDeletingService getDeletingService() { return keyDeletingService; } @@ -643,11 +643,11 @@ public BackgroundService getOpenKeyCleanupService() { return openKeyCleanupService; } - public BackgroundService getSnapshotSstFilteringService() { + public SstFilteringService getSnapshotSstFilteringService() { return snapshotSstFilteringService; } - public BackgroundService getSnapshotDeletingService() { + public SnapshotDeletingService getSnapshotDeletingService() { return snapshotDeletingService; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index 15423a16dabb..bf66528ffb5a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -76,11 +77,13 @@ * If Kerberos is not enabled, simply append the login user name to * `ozone.administrator`, e.g. `scm` */ -public class OMDBCheckpointServlet extends DBCheckpointServlet { +public class OMDBCheckpointServlet extends DBCheckpointServlet + implements BootstrapStateHandler { private static final Logger LOG = LoggerFactory.getLogger(OMDBCheckpointServlet.class); private static final long serialVersionUID = 1L; + private transient BootstrapStateHandler.Lock lock; @Override public void init() throws ServletException { @@ -112,6 +115,8 @@ public void init() throws ServletException { allowedUsers, allowedGroups, om.isSpnegoEnabled()); + + lock = new Lock(om); } @Override @@ -129,26 +134,26 @@ public void writeDbDataToStream(DBCheckpoint checkpoint, // Map of link to path. Map hardLinkFiles = new HashMap<>(); - getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, - includeSnapshotData(request)); - - // Exclude file - Map finalCopyFiles = new HashMap<>(); - copyFiles.forEach((o, path) -> { - String fName = path.getFileName().toString(); - if (!toExcludeList.contains(fName)) { - finalCopyFiles.put(o, path); - } else { - excludedList.add(fName); - } - }); - - try (TarArchiveOutputStream archiveOutputStream = - new TarArchiveOutputStream(destination)) { + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock(); + TarArchiveOutputStream archiveOutputStream = + new TarArchiveOutputStream(destination)) { archiveOutputStream .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); archiveOutputStream .setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); + getFilesForArchive(checkpoint, copyFiles, hardLinkFiles, + includeSnapshotData(request)); + + // Exclude file + Map finalCopyFiles = new HashMap<>(); + copyFiles.forEach((o, path) -> { + String fName = path.getFileName().toString(); + if (!toExcludeList.contains(fName)) { + finalCopyFiles.put(o, path); + } else { + excludedList.add(fName); + } + }); writeFilesToArchive(finalCopyFiles, hardLinkFiles, archiveOutputStream); } } @@ -290,4 +295,48 @@ private OzoneConfiguration getConf() { .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) .getConfiguration(); } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } + + static class Lock extends BootstrapStateHandler.Lock { + private final BootstrapStateHandler keyDeletingService; + private final BootstrapStateHandler sstFilteringService; + private final BootstrapStateHandler rocksDbCheckpointDiffer; + private final BootstrapStateHandler snapshotDeletingService; + private final OzoneManager om; + + Lock(OzoneManager om) { + this.om = om; + keyDeletingService = om.getKeyManager().getDeletingService(); + sstFilteringService = om.getKeyManager().getSnapshotSstFilteringService(); + rocksDbCheckpointDiffer = om.getMetadataManager().getStore() + .getRocksDBCheckpointDiffer(); + snapshotDeletingService = om.getKeyManager().getSnapshotDeletingService(); + } + + @Override + public BootstrapStateHandler.Lock lock() + throws InterruptedException { + // First lock all the handlers. + keyDeletingService.getBootstrapStateLock().lock(); + sstFilteringService.getBootstrapStateLock().lock(); + rocksDbCheckpointDiffer.getBootstrapStateLock().lock(); + snapshotDeletingService.getBootstrapStateLock().lock(); + + // Then wait for the double buffer to be flushed. + om.getOmRatisServer().getOmStateMachine().awaitDoubleBufferFlush(); + return this; + } + + @Override + public void unlock() { + snapshotDeletingService.getBootstrapStateLock().unlock(); + rocksDbCheckpointDiffer.getBootstrapStateLock().unlock(); + sstFilteringService.getBootstrapStateLock().unlock(); + keyDeletingService.getBootstrapStateLock().unlock(); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index 5d3b988bc678..ed519f6fe19f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.ozone.rocksdiff.RocksDiffUtils; import org.rocksdb.RocksDBException; @@ -66,7 +67,8 @@ * all the irrelevant and safe to delete sst files that don't correspond * to the bucket on which the snapshot was taken. */ -public class SstFilteringService extends BackgroundService { +public class SstFilteringService extends BackgroundService + implements BootstrapStateHandler { private static final Logger LOG = LoggerFactory.getLogger(SstFilteringService.class); @@ -102,6 +104,9 @@ public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout, snapshotFilteredCount = new AtomicLong(0); } + private final BootstrapStateHandler.Lock lock = + new BootstrapStateHandler.Lock(); + private class SstFilteringTask implements BackgroundTask { @Override @@ -152,7 +157,10 @@ public BackgroundTaskResult call() throws Exception { new File(snapshotCheckpointDir), dbName, true, Optional.of(Boolean.TRUE), false)) { RocksDatabase db = rdbStore.getDb(); - db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction); + try (BootstrapStateHandler.Lock lock = + getBootstrapStateLock().lock()) { + db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction); + } } // mark the snapshot as filtered by writing to the file @@ -215,4 +223,8 @@ public AtomicLong getSnapshotFilteredCount() { return snapshotFilteredCount; } + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java index 6c89828e3bb9..fc2170e6a11d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java @@ -20,7 +20,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.BatchOperation; -import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -79,13 +79,16 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { if (nextSnapshot != null) { - DBStore nextSnapshotStore = nextSnapshot.getMetadataManager().getStore(); + RDBStore nextSnapshotStore = + (RDBStore) nextSnapshot.getMetadataManager().getStore(); // Init Batch Operation for snapshot db. try (BatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { processKeys(writeBatch, nextSnapshot.getMetadataManager(), nextDBKeysList, true); processDirs(writeBatch, nextSnapshot.getMetadataManager()); nextSnapshotStore.commitBatchOperation(writeBatch); + nextSnapshotStore.getDb().flushWal(true); + nextSnapshotStore.getDb().flush(); } } else { // Handle the case where there is no next Snapshot. @@ -94,12 +97,15 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, } // Update From Snapshot Deleted Table. - DBStore fromSnapshotStore = fromSnapshot.getMetadataManager().getStore(); + RDBStore fromSnapshotStore = + (RDBStore) fromSnapshot.getMetadataManager().getStore(); try (BatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { processKeys(fromSnapshotBatchOp, fromSnapshot.getMetadataManager(), reclaimKeysList, false); fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp); + fromSnapshotStore.getDb().flushWal(true); + fromSnapshotStore.getDb().flush(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 45230073b0f3..27d2ebf75282 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -26,6 +27,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -33,11 +35,15 @@ import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; @@ -59,7 +65,8 @@ * Abstracts common code from KeyDeletingService and DirectoryDeletingService * which is now used by SnapshotDeletingService as well. */ -public abstract class AbstractKeyDeletingService extends BackgroundService { +public abstract class AbstractKeyDeletingService extends BackgroundService + implements BootstrapStateHandler { private final OzoneManager ozoneManager; private final ScmBlockLocationProtocol scmClient; @@ -68,6 +75,8 @@ public abstract class AbstractKeyDeletingService extends BackgroundService { private final AtomicLong movedDirsCount; private final AtomicLong movedFilesCount; private final AtomicLong runCount; + private final BootstrapStateHandler.Lock lock = + new BootstrapStateHandler.Lock(); public AbstractKeyDeletingService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout, @@ -454,4 +463,61 @@ public long getMovedDirsCount() { public long getMovedFilesCount() { return movedFilesCount.get(); } + + protected void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, + List toReclaimList, + List toNextDBList, + List renamedList, + List dirsToMove) throws InterruptedException { + + SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder = + SnapshotMoveDeletedKeysRequest.newBuilder() + .setFromSnapshot(snapInfo.getProtobuf()); + + SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder + .addAllReclaimKeys(toReclaimList) + .addAllNextDBKeys(toNextDBList) + .addAllRenamedKeys(renamedList) + .addAllDeletedDirsToMove(dirsToMove) + .build(); + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.SnapshotMoveDeletedKeys) + .setSnapshotMoveDeletedKeysRequest(moveDeletedKeys) + .setClientId(clientId.toString()) + .build(); + + try (BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock()) { + submitRequest(omRequest); + } + } + + protected void submitRequest(OMRequest omRequest) { + try { + if (isRatisEnabled()) { + OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); + + RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() + .setClientId(clientId) + .setServerId(server.getRaftPeerId()) + .setGroupId(server.getRaftGroupId()) + .setCallId(getRunCount().get()) + .setMessage(Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); + + server.submitRequest(omRequest, raftClientRequest); + } else { + ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); + } + } catch (ServiceException e) { + LOG.error("Snapshot Deleting request failed. " + + "Will retry at next run.", e); + } + } + + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index f859b67ec7a8..69394d347b0d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.om.service; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -37,23 +36,18 @@ import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -425,32 +419,6 @@ private void splitRepeatedOmKeyInfo(SnapshotMoveKeyInfos.Builder toReclaim, } } - private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, - List toReclaimList, - List toNextDBList, - List renamedList, - List dirsToMove) { - - SnapshotMoveDeletedKeysRequest.Builder moveDeletedKeysBuilder = - SnapshotMoveDeletedKeysRequest.newBuilder() - .setFromSnapshot(snapInfo.getProtobuf()); - - SnapshotMoveDeletedKeysRequest moveDeletedKeys = moveDeletedKeysBuilder - .addAllReclaimKeys(toReclaimList) - .addAllNextDBKeys(toNextDBList) - .addAllRenamedKeys(renamedList) - .addAllDeletedDirsToMove(dirsToMove) - .build(); - - OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.SnapshotMoveDeletedKeys) - .setSnapshotMoveDeletedKeysRequest(moveDeletedKeys) - .setClientId(clientId.toString()) - .build(); - - submitRequest(omRequest); - } - private boolean checkDirReclaimable( Table.KeyValue deletedDir, Table previousDirTable, @@ -571,31 +539,6 @@ private SnapshotInfo getPreviousSnapshot(SnapshotInfo snapInfo) } return null; } - - private void submitRequest(OMRequest omRequest) { - try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(getRunCount().get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - server.submitRequest(omRequest, raftClientRequest); - } else { - ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); - } - } catch (ServiceException e) { - LOG.error("Snapshot Deleting request failed. " + - "Will retry at next run.", e); - } - } } @Override diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java index 4b0be53a6cc6..9b6061447ff3 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.utils.db.DBProfile; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; @@ -64,6 +65,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; /** * Test SST Filtering Service. @@ -136,7 +139,7 @@ public void testIrrelevantSstFileDeletion() final int keyCount = 100; createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1); SstFilteringService sstFilteringService = - (SstFilteringService) keyManager.getSnapshotSstFilteringService(); + keyManager.getSnapshotSstFilteringService(); String rocksDbDir = om.getRocksDbDirectory(); @@ -222,6 +225,23 @@ public void testIrrelevantSstFileDeletion() Assert.assertTrue( processedSnapshotIds.contains(snapshotInfo.getSnapshotID())); + long count; + // Prevent the new snapshot from being filtered + try (BootstrapStateHandler.Lock lock = + sstFilteringService.getBootstrapStateLock().lock()) { + count = sstFilteringService.getSnapshotFilteredCount().get(); + writeClient.createSnapshot("vol1", "buck2", "snapshot2"); + + // Confirm that it is not filtered + assertThrows(TimeoutException.class, () -> GenericTestUtils.waitFor( + () -> sstFilteringService.getSnapshotFilteredCount().get() > count, + 1000, 10000)); + assertEquals(count, sstFilteringService.getSnapshotFilteredCount().get()); + } + // Now allow filtering + GenericTestUtils.waitFor( + () -> sstFilteringService.getSnapshotFilteredCount().get() > count, + 1000, 10000); } @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")