Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.lock;

import java.util.concurrent.Semaphore;

/** Bootstrap state handler interface. */
public interface BootstrapStateHandler {
Lock getBootstrapStateLock();

/** Bootstrap state handler lock implementation. */
class Lock implements AutoCloseable {
private final Semaphore semaphore = new Semaphore(1);
public Lock lock() throws InterruptedException {
semaphore.acquire();
return this;
}
public void unlock() {
semaphore.release();
}
public void close() {
unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -96,7 +97,8 @@
* It is important to note that compaction log is per-DB instance. Since
* each OM DB instance might trigger compactions at different timings.
*/
public class RocksDBCheckpointDiffer implements AutoCloseable {
public class RocksDBCheckpointDiffer implements AutoCloseable,
BootstrapStateHandler {

private static final Logger LOG =
LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
Expand Down Expand Up @@ -171,6 +173,8 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
private final ScheduledExecutorService executor;
private boolean closed;
private final long maxAllowedTimeInDag;
private final BootstrapStateHandler.Lock lock
= new BootstrapStateHandler.Lock();

private ColumnFamilyHandle snapshotInfoTableCFHandle;

Expand Down Expand Up @@ -1160,14 +1164,18 @@ public void pruneOlderSnapshotsWithCompactionHistory() {

Set<String> sstFileNodesRemoved =
pruneSstFileNodesFromDag(lastCompactionSstFiles);
removeSstFile(sstFileNodesRemoved);
deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
removeSstFiles(sstFileNodesRemoved);
deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Deletes the SST file from the backup directory if exists.
* Deletes the SST files from the backup directory if exists.
*/
private void removeSstFile(Set<String> sstFileNodes) {
private void removeSstFiles(Set<String> sstFileNodes) {
for (String sstFileNode: sstFileNodes) {
File file =
new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
Expand Down Expand Up @@ -1464,8 +1472,11 @@ public void pruneSstFiles() {
.map(node -> node.getFileName())
.collect(Collectors.toSet());
}

removeSstFile(nonLeafSstFiles);
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
removeSstFiles(nonLeafSstFiles);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -1508,4 +1519,9 @@ public static RocksDBCheckpointDiffer getInstance(
configuration));
}
}

@Override
public BootstrapStateHandler.Lock getBootstrapStateLock() {
return lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,20 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.graph.MutableGraph;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -78,6 +85,7 @@
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -107,6 +115,7 @@ public class TestRocksDBCheckpointDiffer {
private File compactionLogDir;
private File sstBackUpDir;
private ConfigurationSource config;
private ExecutorService executorService = Executors.newCachedThreadPool();

@BeforeEach
public void init() {
Expand Down Expand Up @@ -1064,7 +1073,8 @@ public void testPruneOlderSnapshotsWithCompactionHistory(
List<String> compactionLogs,
Set<String> expectedNodes,
int expectedNumberOfLogFilesDeleted
) throws IOException {
) throws IOException, ExecutionException, InterruptedException,
TimeoutException {
List<File> filesCreated = new ArrayList<>();

for (int i = 0; i < compactionLogs.size(); i++) {
Expand All @@ -1085,7 +1095,8 @@ public void testPruneOlderSnapshotsWithCompactionHistory(

differ.loadAllCompactionLogs();

differ.pruneOlderSnapshotsWithCompactionHistory();
waitForLock(differ,
RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory);

Set<String> actualNodesInForwardDAG = differ.getForwardCompactionDAG()
.nodes()
Expand Down Expand Up @@ -1114,6 +1125,29 @@ public void testPruneOlderSnapshotsWithCompactionHistory(
}
}

// Take the lock, confirm that the consumer doesn't finish
// then release the lock and confirm that the consumer does finish.
private void waitForLock(RocksDBCheckpointDiffer differ,
Consumer<RocksDBCheckpointDiffer> c)
throws InterruptedException, ExecutionException, TimeoutException {

Future<Boolean> future;
// Take the lock and start the consumer.
try (BootstrapStateHandler.Lock lock =
differ.getBootstrapStateLock().lock()) {
future = executorService.submit(
() -> {
c.accept(differ);
return true;
});
// Confirm that the consumer doesn't finish with lock taken.
assertThrows(TimeoutException.class,
() -> future.get(5000, TimeUnit.MILLISECONDS));
}
// Confirm consumer finishes when unlocked.
assertTrue(future.get(1000, TimeUnit.MILLISECONDS));
}

private static Stream<Arguments> sstFilePruningScenarios() {
return Stream.of(
Arguments.of("Case 1: No compaction.",
Expand Down Expand Up @@ -1161,7 +1195,8 @@ public void testSstFilePruning(
String compactionLog,
List<String> initialFiles,
List<String> expectedFiles
) throws IOException {
) throws IOException, ExecutionException, InterruptedException,
TimeoutException {
createFileWithContext(metadataDirName + "/" + compactionLogDirName
+ "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX,
compactionLog);
Expand All @@ -1179,7 +1214,9 @@ public void testSstFilePruning(
config);

differ.loadAllCompactionLogs();
differ.pruneSstFiles();

waitForLock(differ, RocksDBCheckpointDiffer::pruneSstFiles);


Set<String> actualFileSetAfterPruning;
try (Stream<Path> pathStream = Files.list(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -54,6 +58,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -91,7 +96,9 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -108,6 +115,7 @@ public class TestOMDbCheckpointServlet {
private HttpServletRequest requestMock = null;
private HttpServletResponse responseMock = null;
private OMDBCheckpointServlet omDbCheckpointServletMock = null;
private BootstrapStateHandler.Lock lock;
private File metaDir;
private String snapshotDirName;
private String snapshotDirName2;
Expand Down Expand Up @@ -173,6 +181,7 @@ private void setupCluster() throws Exception {
omDbCheckpointServletMock =
mock(OMDBCheckpointServlet.class);

lock = new OMDBCheckpointServlet.Lock(cluster.getOzoneManager());
doCallRealMethod().when(omDbCheckpointServletMock).init();

requestMock = mock(HttpServletRequest.class);
Expand All @@ -195,6 +204,9 @@ private void setupCluster() throws Exception {

doCallRealMethod().when(omDbCheckpointServletMock)
.writeDbDataToStream(any(), any(), any(), any(), any());

when(omDbCheckpointServletMock.getBootstrapStateLock())
.thenReturn(lock);
}

@Test
Expand Down Expand Up @@ -596,4 +608,101 @@ private void checkLine(String shortSnapshotLocation,
String file1 = files[1];
Assert.assertEquals("hl filenames are the same", file0, file1);
}

@Test
public void testBootstrapLocking() throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();

// Get the bootstrap state handlers
KeyManager keyManager = cluster.getOzoneManager().getKeyManager();
BootstrapStateHandler keyDeletingService =
keyManager.getDeletingService();
BootstrapStateHandler snapshotDeletingService =
keyManager.getSnapshotDeletingService();
BootstrapStateHandler sstFilteringService =
keyManager.getSnapshotSstFilteringService();
BootstrapStateHandler differ =
cluster.getOzoneManager().getMetadataManager()
.getStore().getRocksDBCheckpointDiffer();

ExecutorService executorService = Executors.newCachedThreadPool();

OMDBCheckpointServlet omDbCheckpointServlet = new OMDBCheckpointServlet();

OMDBCheckpointServlet spyServlet = spy(omDbCheckpointServlet);
ServletContext servletContext = mock(ServletContext.class);
when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
.thenReturn(cluster.getOzoneManager());
doReturn(servletContext).when(spyServlet).getServletContext();

spyServlet.init();

// Confirm the other handlers are locked out when the bootstrap
// servlet takes the lock.
try (BootstrapStateHandler.Lock lock =
spyServlet.getBootstrapStateLock().lock()) {
confirmServletLocksOutOtherHandler(keyDeletingService, executorService);
confirmServletLocksOutOtherHandler(snapshotDeletingService,
executorService);
confirmServletLocksOutOtherHandler(sstFilteringService, executorService);
confirmServletLocksOutOtherHandler(differ, executorService);
}
// Confirm the servlet is locked out when any of the other
// handlers takes the lock.
confirmOtherHandlerLocksOutServlet(keyDeletingService, spyServlet,
executorService);
confirmOtherHandlerLocksOutServlet(snapshotDeletingService, spyServlet,
executorService);
confirmOtherHandlerLocksOutServlet(sstFilteringService, spyServlet,
executorService);
confirmOtherHandlerLocksOutServlet(differ, spyServlet,
executorService);

// Confirm that servlet takes the lock when none of the other
// handlers have it.
Future<Boolean> servletTest = checkLock(spyServlet, executorService);
Assert.assertTrue(servletTest.get(10000, TimeUnit.MILLISECONDS));

executorService.shutdownNow();

}

// Confirms handler can't take look the servlet already has. Assumes
// the servlet has already taken the lock.
private void confirmServletLocksOutOtherHandler(BootstrapStateHandler handler,
ExecutorService executorService) {
Future<Boolean> test = checkLock(handler, executorService);
// Handler should fail to take the lock because the servlet has taken it.
Assert.assertThrows(TimeoutException.class,
() -> test.get(500, TimeUnit.MILLISECONDS));
}

// Confirms Servlet can't take lock when handler has it.
private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler handler,
BootstrapStateHandler servlet, ExecutorService executorService)
throws InterruptedException {
try (BootstrapStateHandler.Lock lock =
handler.getBootstrapStateLock().lock()) {
Future<Boolean> test = checkLock(servlet, executorService);
// Servlet should fail to lock when other handler has taken it.
Assert.assertThrows(TimeoutException.class,
() -> test.get(500, TimeUnit.MILLISECONDS));
}
}

// Confirm lock is available by having handler take and release it.
private Future<Boolean> checkLock(BootstrapStateHandler handler,
ExecutorService executorService) {
return executorService.submit(() -> {
try {
handler.getBootstrapStateLock().lock();
handler.getBootstrapStateLock().unlock();
return true;
} catch (InterruptedException e) {
}
return false;
});

}
}
Loading