Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ class RocksDBFileManager(
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ != version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ < version to fit with above code comment.

Btw, my preference is actually invalidating entries of cache for the version if we figure out the version will be reprocessed. (Say, invalidating all entries of cache for versions where version in entry > parameter in RocksDB.load().) But I agree this is broader fix which may not be necessary, since listing up cache entries are only performed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Yea, I thought about this too. But listing happens only here and we prob don't expect for this to happen too often. So went with the point fix for now.

.values.flatten.map { f =>
f.localFileName -> f
}.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,19 @@ class RocksDBSuite extends SparkFunSuite {
// Save SAME version again with different checkpoint files and load back again to verify
// whether files were overwritten.
val cpFiles1_ = Seq(
"sst-file1.sst" -> 10, // same SST file as before, should not get copied
"sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied
"sst-file2.sst" -> 25, // new SST file with same name as before, but different length
"sst-file3.sst" -> 30, // new SST file
"other-file1" -> 100, // same non-SST file as before, should not get copied
"other-file2" -> 210, // new non-SST file with same name as before, but different length
"other-file3" -> 300, // new non-SST file
"archive/00001.log" -> 1000, // same log file as before, should not get copied
"archive/00001.log" -> 1000, // same log file as before and version, so should get copied
"archive/00002.log" -> 2500, // new log file with same name as before, but different length
"archive/00003.log" -> 3000 // new log file
)
saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files
assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)

// Save another version and verify
Expand All @@ -394,8 +394,8 @@ class RocksDBSuite extends SparkFunSuite {
"archive/00004.log" -> 4000
)
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files
assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501)

// Loading an older version should work
Expand Down