diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 087b930f0980..d8f6c1b2abb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -371,7 +371,8 @@ class RocksDBFileManager( // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") - val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f => + val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < version) + .values.flatten.map { f => f.localFileName -> f }.toMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 8684b0cdd7a0..f66ac0de8c6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -372,19 +372,19 @@ class RocksDBSuite extends SparkFunSuite { // Save SAME version again with different checkpoint files and load back again to verify // whether files were overwritten. val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, should not get copied + "sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied "sst-file2.sst" -> 25, // new SST file with same name as before, but different length "sst-file3.sst" -> 30, // new SST file "other-file1" -> 100, // same non-SST file as before, should not get copied "other-file2" -> 210, // new non-SST file with same name as before, but different length "other-file3" -> 300, // new non-SST file - "archive/00001.log" -> 1000, // same log file as before, should not get copied + "archive/00001.log" -> 1000, // same log file as before and version, so should get copied "archive/00002.log" -> 2500, // new log file with same name as before, but different length "archive/00003.log" -> 3000 // new log file ) saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files - assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files + assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files + assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) // Save another version and verify @@ -394,8 +394,8 @@ class RocksDBSuite extends SparkFunSuite { "archive/00004.log" -> 4000 ) saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files - assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files + assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) // Loading an older version should work