Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ class RocksDBFileManager(
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < version)
.values.flatten.map { f =>
f.localFileName -> f
}.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,19 @@ class RocksDBSuite extends SparkFunSuite {
// Save SAME version again with different checkpoint files and load back again to verify
// whether files were overwritten.
val cpFiles1_ = Seq(
"sst-file1.sst" -> 10, // same SST file as before, should not get copied
"sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied
"sst-file2.sst" -> 25, // new SST file with same name as before, but different length
"sst-file3.sst" -> 30, // new SST file
"other-file1" -> 100, // same non-SST file as before, should not get copied
"other-file2" -> 210, // new non-SST file with same name as before, but different length
"other-file3" -> 300, // new non-SST file
"archive/00001.log" -> 1000, // same log file as before, should not get copied
"archive/00001.log" -> 1000, // same log file as before and version, so should get copied
"archive/00002.log" -> 2500, // new log file with same name as before, but different length
"archive/00003.log" -> 3000 // new log file
)
saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files
assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)

// Save another version and verify
Expand All @@ -394,8 +394,8 @@ class RocksDBSuite extends SparkFunSuite {
"archive/00004.log" -> 4000
)
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files
assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501)

// Loading an older version should work
Expand Down