Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -825,15 +826,15 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized List<String> getSSTDiffListWithFullPath(
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {

List<String> sstDiffList = getSSTDiffList(src, dest);
Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.stream()
return sstDiffList.map(diffList -> diffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
Expand All @@ -843,7 +844,7 @@ public synchronized List<String> getSSTDiffListWithFullPath(
createLink(link, srcFile);
return link.toString();
})
.collect(Collectors.toList());
.collect(Collectors.toList()));
}

/**
Expand All @@ -857,10 +858,8 @@ public synchronized List<String> getSSTDiffListWithFullPath(
* @param dest destination snapshot
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized List<String> getSSTDiffList(
DifferSnapshotInfo src,
DifferSnapshotInfo dest
) throws IOException {
public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest) throws IOException {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Expand Down Expand Up @@ -898,8 +897,13 @@ public synchronized List<String> getSSTDiffList(
filterRelevantSstFilesFullPath(fwdDAGDifferentFiles,
src.getTablePrefixes());
}

return new ArrayList<>(fwdDAGDifferentFiles);
// Check if the DAG traversal was able to reach all the destination SST files.
for (String destSnapFile : destSnapFiles) {
if (!fwdDAGSameFiles.contains(destSnapFile) || !fwdDAGDifferentFiles.contains(destSnapFile)) {
return Optional.empty();
}
}
return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
int index = 0;
for (DifferSnapshotInfo snap : snapshots) {
// Returns a list of SST files to be fed into RocksDiff
List<String> sstDiffList = differ.getSSTDiffList(src, snap);
List<String> sstDiffList = differ.getSSTDiffList(src, snap).orElse(Collections.emptyList());
LOG.info("SST diff list from '{}' to '{}': {}",
src.getDbPath(), snap.getDbPath(), sstDiffList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -231,7 +232,7 @@ public void testDAGReconstruction()
final File checkpointSnap2 = new File(snap2.getDbPath());
GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);

List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
LOG.debug("Got diff list: {}", sstDiffList21);

// Delete 1000 keys, take a 3rd snapshot, and do another diff
Expand All @@ -250,13 +251,13 @@ public void testDAGReconstruction()
final File checkpointSnap3 = new File(snap3.getDbPath());
GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);

List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());

// snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());

// Same snapshot. Result should be empty list
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2);
List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2).orElse(Collections.emptyList());
assertThat(sstDiffList22).isEmpty();
snapDB1.close();
snapDB2.close();
Expand All @@ -282,13 +283,13 @@ public void testDAGReconstruction()
volumeName, bucketName, "snap3",
((RDBStore) snapDB3.get()
.getMetadataManager().getStore()).getDb().getManagedRocksDb());
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList21, sstDiffList21Run2);

List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2);
List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2).orElse(Collections.emptyList());
assertEquals(sstDiffList32, sstDiffList32Run2);

List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1);
List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1).orElse(Collections.emptyList());
assertEquals(sstDiffList31, sstDiffList31Run2);
snapDB1.close();
snapDB2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,11 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,

LOG.debug("Calling RocksDBCheckpointDiffer");
try {
List<String> sstDiffList = differ.getSSTDiffListWithFullPath(toDSI,
Optional<List<String>> sstDiffList = differ.getSSTDiffListWithFullPath(toDSI,
fromDSI, diffDir);
deltaFiles.addAll(sstDiffList);
if (sstDiffList.isPresent()) {
deltaFiles.addAll(sstDiffList.get());
}
} catch (Exception exception) {
LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. " +
"It will fallback to full diff now.", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public void testGetDeltaFilesWithDag(int numberOfFiles) throws IOException {
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
eq(diffDir))
).thenReturn(Lists.newArrayList(randomStrings));
).thenReturn(Optional.of(Lists.newArrayList(randomStrings)));

ReferenceCounted<OmSnapshot> rcFromSnapshot =
omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, snap1.toString());
Expand Down Expand Up @@ -497,7 +497,7 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles,
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
anyString()))
.thenReturn(Collections.emptyList());
.thenReturn(Optional.ofNullable(Collections.emptyList()));
}

ReferenceCounted<OmSnapshot> rcFromSnapshot =
Expand Down
Loading