Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.StringUtils;
Expand Down Expand Up @@ -49,4 +55,17 @@ public static Set<String> getSSTFilesForComparison(
.map(lfm -> new File(lfm.path(), lfm.fileName()).getPath())
.collect(Collectors.toCollection(HashSet::new));
}

public static Map<Object, String> getSSTFilesWithInodesForComparison(final ManagedRocksDB rocksDB, List<String> cfs) {
return getLiveSSTFilesForCFs(rocksDB, cfs).stream()
.map(lfm -> {
File sstFile = new File(lfm.path(), lfm.fileName());
try {
Object inode = Files.readAttributes(sstFile.toPath(), BasicFileAttributes.class).fileKey();
return new AbstractMap.SimpleEntry<>(inode, sstFile.getPath());
} catch (IOException e) {
throw new UncheckedIOException("Failed to read inode for " + sstFile.getPath(), e);
}
}).collect(Collectors.toMap(Map.Entry::getKey,Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -92,6 +93,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.file.PathUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -375,6 +377,14 @@ protected Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
tablesToLookUp);
}

@VisibleForTesting
protected Map<Object, String> getSSTFileMapForSnapshot(OmSnapshot snapshot,
List<String> tablesToLookUp) {
return RdbUtil.getSSTFilesWithInodesForComparison(((RDBStore)snapshot
.getMetadataManager().getStore()).getDb().getManagedRocksDb(),
tablesToLookUp);
}

/**
* Gets the report key for a particular index of snapshot diff job.
*/
Expand Down Expand Up @@ -1203,11 +1213,7 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
.getDb().getManagedRocksDb();
ManagedRocksDB toDB = ((RDBStore)toSnapshot.getMetadataManager().getStore())
.getDb().getManagedRocksDb();
Set<String> fromSnapshotFiles = getSSTFileListForSnapshot(fromSnapshot, tablesToLookUp);
Set<String> toSnapshotFiles = getSSTFileListForSnapshot(toSnapshot, tablesToLookUp);
Set<String> diffFiles = new HashSet<>();
diffFiles.addAll(fromSnapshotFiles);
diffFiles.addAll(toSnapshotFiles);
Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot, tablesToLookUp);
RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixes, fromDB, toDB);
deltaFiles = Optional.of(diffFiles);
}
Expand All @@ -1217,6 +1223,27 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
toSnapshot.getSnapshotTableKey()));
}

private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot toSnapshot, List<String> tablesToLookUp) {
Set<String> diffFiles;
try {
Map<Object, String> fromSnapshotFiles = getSSTFileMapForSnapshot(fromSnapshot, tablesToLookUp);
Map<Object, String> toSnapshotFiles = getSSTFileMapForSnapshot(toSnapshot, tablesToLookUp);
diffFiles = Stream.concat(
fromSnapshotFiles.entrySet().stream()
.filter(e -> !toSnapshotFiles.containsKey(e.getKey())),
toSnapshotFiles.entrySet().stream()
.filter(e -> !fromSnapshotFiles.containsKey(e.getKey())))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
} catch (UncheckedIOException e) {
// In case of exception during inode read use all files
diffFiles = new HashSet<>();
diffFiles.addAll(getSSTFileListForSnapshot(fromSnapshot, tablesToLookUp));
diffFiles.addAll(getSSTFileListForSnapshot(toSnapshot, tablesToLookUp));
}
return diffFiles;
}

private void validateEstimatedKeyChangesAreInLimits(
SstFileSetReader sstFileReader
) throws RocksDBException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ private void setupMocksForRunningASnapDiff(
}

@Test
public void testGetDeltaFilesWithFullDiff() throws IOException {
public void testGetDeltaFilesWithFullDiff() throws IOException {
SnapshotDiffManager spy = spy(snapshotDiffManager);
UUID snap1 = UUID.randomUUID();
OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1);
Expand All @@ -1545,20 +1545,28 @@ public void testGetDeltaFilesWithFullDiff() throws IOException {
Mockito.doAnswer(invocation -> {
OmSnapshot snapshot = invocation.getArgument(0);
if (snapshot == fromSnapshot) {
return Sets.newHashSet("1", "2", "3");
Map<Integer, String> inodeToFileMap = new HashMap<>();
inodeToFileMap.put(1, "1.sst");
inodeToFileMap.put(2, "2.sst");
inodeToFileMap.put(3, "3.sst");
return inodeToFileMap;
}
if (snapshot == toSnapshot) {
return Sets.newHashSet("3", "4", "5");
Map<Integer, String> inodeToFileMap = new HashMap<>();
inodeToFileMap.put(1, "10.sst");
inodeToFileMap.put(2, "20.sst");
inodeToFileMap.put(4, "4.sst");
return inodeToFileMap;
}
return Sets.newHashSet("6", "7", "8");
}).when(spy).getSSTFileListForSnapshot(Mockito.any(OmSnapshot.class),
return null;
}).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class),
Mockito.anyList());
doNothing().when(spy).recordActivity(any(), any());
doNothing().when(spy).updateProgress(anyString(), anyDouble());
String diffJobKey = snap1 + DELIMITER + snap2;
Set<String> deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptyList(), snapshotInfo,
snapshotInfo, true, Collections.emptyMap(), null, diffJobKey);
Assertions.assertEquals(Sets.newHashSet("1", "2", "3", "4", "5"), deltaFiles);
Assertions.assertEquals(Sets.newHashSet("3.sst", "4.sst"), deltaFiles);
}

@Test
Expand Down
Loading