Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -858,27 +858,25 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public List<String> getSSTDiffListWithFullPath(
public synchronized List<String> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {

synchronized (this) {
List<String> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
Path link = Paths.get(sstFilesDirForSnapDiffJob,
sst + SST_FILE_EXTENSION);
Path srcFile = Paths.get(sstFullPath);
createLink(link, srcFile);
return link.toString();
})
.collect(Collectors.toList());
}
List<String> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
Path link = Paths.get(sstFilesDirForSnapDiffJob,
sst + SST_FILE_EXTENSION);
Path srcFile = Paths.get(sstFullPath);
createLink(link, srcFile);
return link.toString();
})
.collect(Collectors.toList());
}

/**
Expand All @@ -892,8 +890,8 @@ public List<String> getSSTDiffListWithFullPath(
* @param dest destination snapshot
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public List<String> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest)
public synchronized List<String> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest)
throws IOException {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
Expand Down Expand Up @@ -963,7 +961,7 @@ public void filterRelevantSstFilesFullPath(Set<String> inputFiles,
* diffing). Otherwise, add it to the differentFiles map, as it will
* need further diffing.
*/
void internalGetSSTDiffList(
synchronized void internalGetSSTDiffList(
DifferSnapshotInfo src, DifferSnapshotInfo dest,
Set<String> srcSnapFiles, Set<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
Expand Down Expand Up @@ -1163,6 +1161,9 @@ public void pruneOlderSnapshotsWithCompactionHistory() {

Set<String> sstFileNodesRemoved =
pruneSstFileNodesFromDag(lastCompactionSstFiles);

LOG.info("Removing SST files: {} as part of compaction DAG pruning.",
sstFileNodesRemoved);
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
removeSstFiles(sstFileNodesRemoved);
deleteOlderSnapshotsCompactionFiles(olderSnapshotsLogFilePaths);
Expand Down Expand Up @@ -1463,14 +1464,15 @@ private SnapshotLogInfo(long snapshotGenerationId,
* those are not needed to generate snapshot diff. These files are basically
* non-leaf nodes of the DAG.
*/
public void pruneSstFiles() {
public synchronized void pruneSstFiles() {
Set<String> nonLeafSstFiles;
synchronized (this) {
nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
.filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
.map(node -> node.getFileName())
.collect(Collectors.toSet());
}
nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
.filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
.map(node -> node.getFileName())
.collect(Collectors.toSet());

LOG.info("Removing SST files: {} as part of SST file pruning.",
nonLeafSstFiles);
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
removeSstFiles(nonLeafSstFiles);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION;
import static org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
Expand All @@ -108,6 +109,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertThrows;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -225,7 +227,7 @@ private void init() throws Exception {

private static void expectFailurePreFinalization(LambdaTestUtils.
VoidCallable eval) throws Exception {
OMException ex = Assert.assertThrows(OMException.class,
OMException ex = assertThrows(OMException.class,
() -> eval.call());
Assert.assertEquals(ex.getResult(),
NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION);
Expand Down Expand Up @@ -583,6 +585,12 @@ public void testSnapDiff() throws Exception {
key1 = createFileKeyWithPrefix(bucket1, key1);
String snap1 = "snap" + counter.incrementAndGet();
createSnapshot(volume, bucket, snap1);

// When from and to snapshots are same, it returns empty response.
SnapshotDiffReportOzone
diff0 = getSnapDiffReport(volume, bucket, snap1, snap1);
assertTrue(diff0.getDiffList().isEmpty());

// Do nothing, take another snapshot
String snap2 = "snap" + counter.incrementAndGet();
createSnapshot(volume, bucket, snap2);
Expand Down Expand Up @@ -809,16 +817,28 @@ public void testSnapDiffNoSnapshot() throws Exception {
String snap1 = "snap" + counter.incrementAndGet();
createSnapshot(volume, bucket, snap1);
String snap2 = "snap" + counter.incrementAndGet();

// Destination snapshot is invalid
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
OMException omException = assertThrows(OMException.class,
() -> store.snapshotDiff(volume, bucket, snap1, snap2,
null, 0, false, false));
assertEquals(KEY_NOT_FOUND, omException.getResult());
// From snapshot is invalid
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volume, bucket, snap2, snap1,
null, 0, false, false));
omException = assertThrows(OMException.class,
() -> store.snapshotDiff(volume, bucket, snap2, snap1,
null, 0, false, false));

assertEquals(KEY_NOT_FOUND, omException.getResult());

createSnapshot(volume, bucket, snap2);

omException = assertThrows(OMException.class, () ->
store.snapshotDiff(volume, bucket, snap2, snap1, null, 0, false, false)
);

assertEquals(INTERNAL_ERROR, omException.getResult());
assertEquals("fromSnapshot:" + snap2 + " should be older than to " +
"toSnapshot:" + snap1, omException.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -96,8 +97,10 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager.getSnapshotRootPath;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;

/**
* This class is used to manage/create OM snapshots.
Expand Down Expand Up @@ -711,6 +714,14 @@ public SnapshotDiffResponse getSnapshotDiffReport(final String volume,

validateSnapshotsExistAndActive(volume, bucket, fromSnapshot, toSnapshot);

// Check if fromSnapshot and toSnapshot are equal.
if (Objects.equals(fromSnapshot, toSnapshot)) {
SnapshotDiffReportOzone diffReport = new SnapshotDiffReportOzone(
getSnapshotRootPath(volume, bucket).toString(), volume, bucket,
fromSnapshot, toSnapshot, Collections.emptyList(), null);
return new SnapshotDiffResponse(diffReport, DONE, 0L);
}

int index = getIndexFromToken(token);
if (pageSize <= 0 || pageSize > maxPageSize) {
pageSize = maxPageSize;
Expand Down Expand Up @@ -776,6 +787,7 @@ private void validateSnapshotsExistAndActive(final String volumeName,
// Block SnapDiff if either of the snapshots is not active.
checkSnapshotActive(fromSnapInfo, false);
checkSnapshotActive(toSnapInfo, false);

// Check snapshot creation time
if (fromSnapInfo.getCreationTime() > toSnapInfo.getCreationTime()) {
throw new IOException("fromSnapshot:" + fromSnapInfo.getName() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(
}

@NotNull
private static OFSPath getSnapshotRootPath(String volume, String bucket) {
public static OFSPath getSnapshotRootPath(String volume, String bucket) {
org.apache.hadoop.fs.Path bucketPath = new org.apache.hadoop.fs.Path(
OZONE_URI_DELIMITER + volume + OZONE_URI_DELIMITER + bucket);
return new OFSPath(bucketPath, new OzoneConfiguration());
Expand Down Expand Up @@ -1200,9 +1200,14 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
getDSIFromSI(tsInfo, toSnapshot, volume, bucket);

LOG.debug("Calling RocksDBCheckpointDiffer");
List<String> sstDiffList =
differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir);
deltaFiles.addAll(sstDiffList);
try {
List<String> sstDiffList =
differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir);
deltaFiles.addAll(sstDiffList);
} catch (Exception exception) {
LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " +
"It will fallback to full diff now.", exception);
}
}

if (useFullDiff || deltaFiles.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.rocksdb.RocksIterator;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
Expand Down Expand Up @@ -163,6 +164,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -459,22 +461,78 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles,
});
return null;
});
SnapshotDiffManager spy = spy(snapshotDiffManager);
UUID snap1 = UUID.randomUUID();
UUID snap2 = UUID.randomUUID();
if (!useFullDiff) {
Set<String> randomStrings = Collections.emptySet();
when(differ.getSSTDiffListWithFullPath(
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
anyString()))
.thenReturn(Lists.newArrayList(randomStrings));
.thenReturn(Collections.emptyList());
}

SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
when(jobTableIterator.isValid()).thenReturn(false);
Set<String> deltaFiles = spy.getDeltaFiles(
Set<String> deltaFiles = snapshotDiffManager.getDeltaFiles(
snapshotCache.get(snap1.toString()),
snapshotCache.get(snap2.toString()),
Arrays.asList("cf1", "cf2"),
fromSnapshotInfo,
toSnapshotInfo,
false,
Collections.emptyMap(),
Files.createTempDirectory("snapdiff_dir").toAbsolutePath()
.toString());
assertEquals(deltaStrings, deltaFiles);
}
}

@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 5, 10, 100, 1000, 10000})
public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles)
throws ExecutionException, RocksDBException, IOException {
try (MockedStatic<RdbUtil> mockedRdbUtil =
Mockito.mockStatic(RdbUtil.class);
MockedStatic<RocksDiffUtils> mockedRocksDiffUtils =
Mockito.mockStatic(RocksDiffUtils.class)) {
Set<String> deltaStrings = new HashSet<>();

mockedRdbUtil.when(
() -> RdbUtil.getSSTFilesForComparison(anyString(), anyList()))
.thenAnswer((Answer<Set<String>>) invocation -> {
Set<String> retVal = IntStream.range(0, numberOfFiles)
.mapToObj(i -> RandomStringUtils.randomAlphabetic(10))
.collect(Collectors.toSet());
deltaStrings.addAll(retVal);
return retVal;
});

mockedRocksDiffUtils.when(() ->
RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap()))
.thenAnswer((Answer<Void>) invocationOnMock -> {
invocationOnMock.getArgument(0, Set.class).stream()
.findAny().ifPresent(val -> {
assertTrue(deltaStrings.contains(val));
invocationOnMock.getArgument(0, Set.class).remove(val);
deltaStrings.remove(val);
});
return null;
});
UUID snap1 = UUID.randomUUID();
UUID snap2 = UUID.randomUUID();

doThrow(new FileNotFoundException("File not found exception."))
.when(differ)
.getSSTDiffListWithFullPath(
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
anyString());

SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
when(jobTableIterator.isValid()).thenReturn(false);
Set<String> deltaFiles = snapshotDiffManager.getDeltaFiles(
snapshotCache.get(snap1.toString()),
snapshotCache.get(snap2.toString()),
Arrays.asList("cf1", "cf2"),
Expand Down