-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider #41099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
da54bb5
initial implementation
chaoqin-li1123 5bda37f
add conf
chaoqin-li1123 8bc8552
remove unused import
chaoqin-li1123 be7846b
test correctness with changelog checkpointing
chaoqin-li1123 0512806
add unit test and fix bug
chaoqin-li1123 974a47e
clean up
chaoqin-li1123 2d81dc2
address comment
chaoqin-li1123 9df1340
fix build
chaoqin-li1123 1b0f94c
respect minDeltasForSnapshot in changelog checkpointing
chaoqin-li1123 bf30cf1
fix checkpoint interval bug and address comment
chaoqin-li1123 19b8355
address comments
chaoqin-li1123 4ff442f
clean up conf for changelog checkpointing
chaoqin-li1123 b3cc436
enable streaming aggregation suite to run with rocksdb
chaoqin-li1123 0ef2fc9
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 e59d43f
clean up
chaoqin-li1123 1e46adc
add doc
chaoqin-li1123 5f53f49
address comments
chaoqin-li1123 5910fb7
address comments
chaoqin-li1123 0ee93c1
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 7c65cac
add comments
chaoqin-li1123 b2ead71
address comments
chaoqin-li1123 36d9ae2
address comments
chaoqin-li1123 4aa2605
simplify
chaoqin-li1123 ff4cff9
add doc and comments
chaoqin-li1123 82e7168
add backward compatibility integration test
chaoqin-li1123 4109c29
comment out tests
chaoqin-li1123 573e0e9
comment out tests
chaoqin-li1123 fc8c1bd
comment out tests
chaoqin-li1123 6480621
move tests around to pass ci
chaoqin-li1123 590f21c
move tests around to pass ci
chaoqin-li1123 bb58556
move tests around to pass ci
chaoqin-li1123 14d7b91
move tests around to pass ci
chaoqin-li1123 99f5e0a
fix nits
chaoqin-li1123 b1d3809
improve doc
chaoqin-li1123 050f214
fix test nits
chaoqin-li1123 f723840
use NextIterator
chaoqin-li1123 da7aa99
make rocksdb state store suite use sqlconf in shared spark session
chaoqin-li1123 4f9b0a7
address testing comments
chaoqin-li1123 7d52ed5
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 91d0075
add after each
chaoqin-li1123 5732fbd
fix test failure
chaoqin-li1123 6cb6d0b
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,15 @@ class RocksDB( | |
| hadoopConf: Configuration = new Configuration, | ||
| loggingId: String = "") extends Logging { | ||
|
|
||
| case class RocksDBCheckpoint(checkpointDir: File, version: Long, numKeys: Long) { | ||
|
||
| def close(): Unit = { | ||
| silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") | ||
| } | ||
| } | ||
|
|
||
| @volatile private var latestCheckpoint: Option[RocksDBCheckpoint] = None | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @volatile private var lastCheckpointVersion = 0L | ||
|
|
||
| RocksDBLoader.loadLibrary() | ||
|
|
||
| // Java wrapper objects linking to native RocksDB objects | ||
|
|
@@ -109,13 +118,15 @@ class RocksDB( | |
| private val nativeStats = dbOptions.statistics() | ||
|
|
||
| private val workingDir = createTempDir("workingDir") | ||
| private val fileManager = new RocksDBFileManager( | ||
| dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId) | ||
| private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), | ||
| hadoopConf, conf.compressionCodec, loggingId = loggingId) | ||
| private val byteArrayPair = new ByteArrayPair() | ||
| private val commitLatencyMs = new mutable.HashMap[String, Long]() | ||
| private val acquireLock = new Object | ||
|
|
||
| @volatile private var db: NativeRocksDB = _ | ||
| @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None | ||
| private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing | ||
| @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded | ||
| @volatile private var numKeysOnLoadedVersion = 0L | ||
| @volatile private var numKeysOnWritingVersion = 0L | ||
|
|
@@ -129,14 +140,15 @@ class RocksDB( | |
| * Note that this will copy all the necessary file from DFS to local disk as needed, | ||
| * and possibly restart the native RocksDB instance. | ||
| */ | ||
| def load(version: Long): RocksDB = { | ||
| def load(version: Long, readOnly: Boolean = false): RocksDB = { | ||
| assert(version >= 0) | ||
| acquire() | ||
| logInfo(s"Loading $version") | ||
| try { | ||
| if (loadedVersion != version) { | ||
| closeDB() | ||
| val metadata = fileManager.loadCheckpointFromDfs(version, workingDir) | ||
| val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) | ||
| openDB() | ||
|
|
||
| val numKeys = if (!conf.trackTotalNumberOfRows) { | ||
|
|
@@ -150,8 +162,27 @@ class RocksDB( | |
| metadata.numKeys | ||
| } | ||
| numKeysOnWritingVersion = numKeys | ||
| numKeysOnLoadedVersion = numKeys | ||
|
|
||
| // Replay change log from the last snapshot to the loaded version. | ||
| // This will be noop if changelog checkpointing is disabled. | ||
| for (v <- latestSnapshotVersion + 1 to version) { | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| var changelogReader: StateStoreChangelogReader = null | ||
| try { | ||
| changelogReader = fileManager.getChangelogReader(v) | ||
| while (changelogReader.hasNext) { | ||
| val byteArrayPair = changelogReader.next() | ||
| if (byteArrayPair.value != null) { | ||
| put(byteArrayPair.key, byteArrayPair.value) | ||
| } else { | ||
| remove(byteArrayPair.key) | ||
| } | ||
| } | ||
| } finally { | ||
| if (changelogReader != null) changelogReader.close() | ||
| } | ||
| } | ||
| // After changelog replay the numKeysOnWritingVersion will be updated to | ||
| // the correct number of keys in the loaded version. | ||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| loadedVersion = version | ||
| fileManagerMetrics = fileManager.latestLoadCheckpointMetrics | ||
| } | ||
|
|
@@ -164,6 +195,9 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded data | ||
| throw t | ||
| } | ||
| if (enableChangelogCheckpointing && !readOnly) { | ||
| changelogWriter = Some(fileManager.getChangeLogWriter(version + 1)) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| this | ||
| } | ||
|
|
||
|
|
@@ -187,6 +221,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.put(writeOptions, key, value) | ||
| changelogWriter.foreach(_.put(key, value)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -201,6 +236,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.delete(writeOptions, key) | ||
| changelogWriter.foreach(_.delete(key)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -286,44 +322,49 @@ class RocksDB( | |
| */ | ||
| def commit(): Long = { | ||
| val newVersion = loadedVersion + 1 | ||
| val checkpointDir = createTempDir("checkpoint") | ||
| var rocksDBBackgroundThreadPaused = false | ||
| try { | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
|
|
||
| logInfo(s"Flushing updates for $newVersion") | ||
| val flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
|
|
||
| val compactTimeMs = if (conf.compactOnCommit) { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logInfo("Compacting") | ||
| timeTakenMs { db.compactRange() } | ||
| } else 0 | ||
|
|
||
| logInfo("Pausing background work") | ||
| val pauseTimeMs = timeTakenMs { | ||
| db.pauseBackgroundWork() // To avoid files being changed while committing | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| rocksDBBackgroundThreadPaused = true | ||
| } | ||
|
|
||
| logInfo(s"Creating checkpoint for $newVersion in $checkpointDir") | ||
| val checkpointTimeMs = timeTakenMs { | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| var flushTimeMs = 0L | ||
| var checkpointTimeMs = 0L | ||
| if (shouldCreateSnapshot()) { | ||
| flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| checkpointTimeMs = timeTakenMs { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val checkpointDir = createTempDir("checkpoint") | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| synchronized { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| latestCheckpoint = Some( | ||
| RocksDBCheckpoint(checkpointDir, newVersion, numKeysOnWritingVersion)) | ||
| lastCheckpointVersion = newVersion | ||
| } | ||
| } | ||
| } | ||
|
|
||
| logInfo(s"Syncing checkpoint for $newVersion to DFS") | ||
| val fileSyncTimeMs = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion) | ||
| if (enableChangelogCheckpointing) { | ||
| try { | ||
| assert(changelogWriter.isDefined) | ||
| changelogWriter.foreach(_.commit()) | ||
| } finally { | ||
| changelogWriter = None | ||
| } | ||
| } else { | ||
| assert(changelogWriter.isEmpty) | ||
| uploadSnapshot() | ||
| } | ||
| } | ||
|
|
||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| loadedVersion = newVersion | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| commitLatencyMs ++= Map( | ||
| "flush" -> flushTimeMs, | ||
| "compact" -> compactTimeMs, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "pause" -> pauseTimeMs, | ||
| "checkpoint" -> checkpointTimeMs, | ||
| "fileSync" -> fileSyncTimeMs | ||
| ) | ||
|
|
@@ -334,25 +375,59 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded version | ||
| throw t | ||
| } finally { | ||
| if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork() | ||
| silentDeleteRecursively(checkpointDir, s"committing $newVersion") | ||
| // reset resources as either 1) we already pushed the changes and it has been committed or | ||
| // 2) commit has failed and the current version is "invalidated". | ||
| release() | ||
| } | ||
| } | ||
|
|
||
| private def shouldCreateSnapshot(): Boolean = { | ||
| if (enableChangelogCheckpointing) { | ||
| assert(changelogWriter.isDefined) | ||
| val newVersion = loadedVersion + 1 | ||
| newVersion - lastCheckpointVersion >= conf.minDeltasForSnapshot || | ||
| changelogWriter.get.size > 10000 | ||
| } else true | ||
| } | ||
|
|
||
| private def uploadSnapshot(): Unit = { | ||
| val localCheckpoint = synchronized { | ||
| val checkpoint = latestCheckpoint | ||
| latestCheckpoint = None | ||
| checkpoint | ||
| } | ||
| localCheckpoint match { | ||
| case Some(RocksDBCheckpoint(localDir, version, numKeys)) => | ||
| try { | ||
| val uploadTime = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(localDir, version, numKeys) | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| } | ||
| logInfo(s"Upload snapshot of version $version, time taken: $uploadTime ms") | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } finally { | ||
| localCheckpoint.foreach(_.close()) | ||
| } | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drop uncommitted changes, and roll back to previous version. | ||
| */ | ||
| def rollback(): Unit = { | ||
| numKeysOnWritingVersion = numKeysOnLoadedVersion | ||
| loadedVersion = -1L | ||
| changelogWriter.foreach(_.abort()) | ||
| // Make sure changelogWriter gets recreated next time. | ||
| changelogWriter = None | ||
| release() | ||
| logInfo(s"Rolled back to $loadedVersion") | ||
| } | ||
|
|
||
| def cleanup(): Unit = { | ||
| if (enableChangelogCheckpointing) { | ||
| uploadSnapshot() | ||
| } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val cleanupTime = timeTakenMs { | ||
| fileManager.deleteOldVersions(conf.minVersionsToRetain) | ||
| } | ||
|
|
@@ -369,6 +444,7 @@ class RocksDB( | |
| flushOptions.close() | ||
| dbOptions.close() | ||
| dbLogger.close() | ||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| silentDeleteRecursively(localRootDir, "closing RocksDB") | ||
| } catch { | ||
| case e: Exception => | ||
|
|
@@ -550,7 +626,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null) | |
| */ | ||
| case class RocksDBConf( | ||
| minVersionsToRetain: Int, | ||
| minDeltasForSnapshot: Int, | ||
| compactOnCommit: Boolean, | ||
| enableChangelogCheckpointing: Boolean, | ||
| blockSizeKB: Long, | ||
| blockCacheSizeMB: Long, | ||
| lockAcquireTimeoutMs: Long, | ||
|
|
@@ -563,7 +641,8 @@ case class RocksDBConf( | |
| boundedMemoryUsage: Boolean, | ||
| totalMemoryUsageMB: Long, | ||
| writeBufferCacheRatio: Double, | ||
| highPriorityPoolRatio: Double) | ||
| highPriorityPoolRatio: Double, | ||
| compressionCodec: String) | ||
|
|
||
| object RocksDBConf { | ||
| /** Common prefix of all confs in SQLConf that affects RocksDB */ | ||
|
|
@@ -585,6 +664,8 @@ object RocksDBConf { | |
|
|
||
| // Configuration that specifies whether to compact the RocksDB data every time data is committed | ||
| private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false") | ||
| private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry( | ||
| "enableChangelogCheckpointing", "false") | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4") | ||
| private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8") | ||
| // See SPARK-42794 for details. | ||
|
|
@@ -705,7 +786,9 @@ object RocksDBConf { | |
|
|
||
| RocksDBConf( | ||
| storeConf.minVersionsToRetain, | ||
| storeConf.minDeltasForSnapshot, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| getBooleanConf(COMPACT_ON_COMMIT_CONF), | ||
| getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF), | ||
| getPositiveLongConf(BLOCK_SIZE_KB_CONF), | ||
| getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF), | ||
| getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF), | ||
|
|
@@ -718,7 +801,8 @@ object RocksDBConf { | |
| getBooleanConf(BOUNDED_MEMORY_USAGE_CONF), | ||
| getLongConf(MAX_MEMORY_USAGE_MB_CONF), | ||
| getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF), | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF)) | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF), | ||
| storeConf.compressionCodec) | ||
| } | ||
|
|
||
| def apply(): RocksDBConf = apply(new StateStoreConf()) | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.