-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider #41099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
da54bb5
5bda37f
8bc8552
be7846b
0512806
974a47e
2d81dc2
9df1340
1b0f94c
bf30cf1
19b8355
4ff442f
b3cc436
0ef2fc9
e59d43f
1e46adc
5f53f49
5910fb7
0ee93c1
7c65cac
b2ead71
36d9ae2
4aa2605
ff4cff9
82e7168
4109c29
573e0e9
fc8c1bd
6480621
590f21c
bb58556
14d7b91
99f5e0a
b1d3809
050f214
f723840
da7aa99
4f9b0a7
7d52ed5
91d0075
5732fbd
6cb6d0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,14 @@ class RocksDB( | |
| hadoopConf: Configuration = new Configuration, | ||
| loggingId: String = "") extends Logging { | ||
|
|
||
| case class RocksDBCheckpoint(checkpointDir: File, version: Long, numKeys: Long) { | ||
|
||
| def close(): Unit = { | ||
| silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") | ||
| } | ||
| } | ||
|
|
||
| @volatile private var latestCheckpoint: Option[RocksDBCheckpoint] = None | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| RocksDBLoader.loadLibrary() | ||
|
|
||
| // Java wrapper objects linking to native RocksDB objects | ||
|
|
@@ -109,13 +117,15 @@ class RocksDB( | |
| private val nativeStats = dbOptions.statistics() | ||
|
|
||
| private val workingDir = createTempDir("workingDir") | ||
| private val fileManager = new RocksDBFileManager( | ||
| dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId) | ||
| private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), | ||
| hadoopConf, conf.compressionCodec, loggingId = loggingId) | ||
| private val byteArrayPair = new ByteArrayPair() | ||
| private val commitLatencyMs = new mutable.HashMap[String, Long]() | ||
| private val acquireLock = new Object | ||
|
|
||
| @volatile private var db: NativeRocksDB = _ | ||
| @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None | ||
| private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing | ||
| @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded | ||
| @volatile private var numKeysOnLoadedVersion = 0L | ||
| @volatile private var numKeysOnWritingVersion = 0L | ||
|
|
@@ -129,14 +139,15 @@ class RocksDB( | |
| * Note that this will copy all the necessary file from DFS to local disk as needed, | ||
| * and possibly restart the native RocksDB instance. | ||
| */ | ||
| def load(version: Long): RocksDB = { | ||
| def load(version: Long, readOnly: Boolean = false): RocksDB = { | ||
| assert(version >= 0) | ||
| acquire() | ||
| logInfo(s"Loading $version") | ||
| try { | ||
| if (loadedVersion != version) { | ||
| closeDB() | ||
| val metadata = fileManager.loadCheckpointFromDfs(version, workingDir) | ||
| val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) | ||
| openDB() | ||
|
|
||
| val numKeys = if (!conf.trackTotalNumberOfRows) { | ||
|
|
@@ -150,8 +161,27 @@ class RocksDB( | |
| metadata.numKeys | ||
| } | ||
| numKeysOnWritingVersion = numKeys | ||
| numKeysOnLoadedVersion = numKeys | ||
|
|
||
| // Replay change log from the last snapshot to the loaded version. | ||
| // This will be noop if changelog checkpointing is disabled. | ||
| for (v <- latestSnapshotVersion + 1 to version) { | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| var changelogReader: StateStoreChangelogReader = null | ||
| try { | ||
| changelogReader = fileManager.getChangelogReader(v) | ||
| while (changelogReader.hasNext) { | ||
| val byteArrayPair = changelogReader.next() | ||
| if (byteArrayPair.value != null) { | ||
| put(byteArrayPair.key, byteArrayPair.value) | ||
| } else { | ||
| remove(byteArrayPair.key) | ||
| } | ||
| } | ||
| } finally { | ||
| if (changelogReader != null) changelogReader.close() | ||
| } | ||
| } | ||
| // After changelog replay the numKeysOnWritingVersion will be updated to | ||
| // the correct number of keys in the loaded version. | ||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| loadedVersion = version | ||
| fileManagerMetrics = fileManager.latestLoadCheckpointMetrics | ||
| } | ||
|
|
@@ -164,6 +194,9 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded data | ||
| throw t | ||
| } | ||
| if (enableChangelogCheckpointing && !readOnly) { | ||
| changelogWriter = Some(fileManager.getChangeLogWriter(version + 1)) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| this | ||
| } | ||
|
|
||
|
|
@@ -187,6 +220,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.put(writeOptions, key, value) | ||
| changelogWriter.foreach(_.put(key, value)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -201,6 +235,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.delete(writeOptions, key) | ||
| changelogWriter.foreach(_.delete(key)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -286,44 +321,48 @@ class RocksDB( | |
| */ | ||
| def commit(): Long = { | ||
| val newVersion = loadedVersion + 1 | ||
| val checkpointDir = createTempDir("checkpoint") | ||
| var rocksDBBackgroundThreadPaused = false | ||
| try { | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
|
|
||
| logInfo(s"Flushing updates for $newVersion") | ||
| val flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
|
|
||
| val compactTimeMs = if (conf.compactOnCommit) { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logInfo("Compacting") | ||
| timeTakenMs { db.compactRange() } | ||
| } else 0 | ||
|
|
||
| logInfo("Pausing background work") | ||
| val pauseTimeMs = timeTakenMs { | ||
| db.pauseBackgroundWork() // To avoid files being changed while committing | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| rocksDBBackgroundThreadPaused = true | ||
| } | ||
|
|
||
| logInfo(s"Creating checkpoint for $newVersion in $checkpointDir") | ||
| val checkpointTimeMs = timeTakenMs { | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| var flushTimeMs = 0L | ||
| var checkpointTimeMs = 0L | ||
| if (shouldCreateSnapshot()) { | ||
| flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| checkpointTimeMs = timeTakenMs { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val checkpointDir = createTempDir("checkpoint") | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| synchronized { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| latestCheckpoint = Some( | ||
| RocksDBCheckpoint(checkpointDir, newVersion, numKeysOnWritingVersion)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| logInfo(s"Syncing checkpoint for $newVersion to DFS") | ||
| val fileSyncTimeMs = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion) | ||
| if (enableChangelogCheckpointing) { | ||
| try { | ||
| assert(changelogWriter.isDefined) | ||
| changelogWriter.foreach(_.commit()) | ||
| } finally { | ||
| changelogWriter = None | ||
| } | ||
| } else { | ||
| assert(changelogWriter.isEmpty) | ||
| uploadSnapshot() | ||
| } | ||
| } | ||
|
|
||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| loadedVersion = newVersion | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| commitLatencyMs ++= Map( | ||
| "flush" -> flushTimeMs, | ||
| "compact" -> compactTimeMs, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "pause" -> pauseTimeMs, | ||
| "checkpoint" -> checkpointTimeMs, | ||
| "fileSync" -> fileSyncTimeMs | ||
| ) | ||
|
|
@@ -334,25 +373,59 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded version | ||
| throw t | ||
| } finally { | ||
| if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork() | ||
| silentDeleteRecursively(checkpointDir, s"committing $newVersion") | ||
| // reset resources as either 1) we already pushed the changes and it has been committed or | ||
| // 2) commit has failed and the current version is "invalidated". | ||
| release() | ||
| } | ||
| } | ||
|
|
||
| private def shouldCreateSnapshot(): Boolean = { | ||
| if (enableChangelogCheckpointing) { | ||
| assert(changelogWriter.isDefined) | ||
| val newVersion = loadedVersion + 1 | ||
| newVersion - fileManager.getLastUploadedSnapshotVersion() >= conf.minDeltasForSnapshot || | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| changelogWriter.get.size > 1000 | ||
|
||
| } else true | ||
| } | ||
|
|
||
| private def uploadSnapshot(): Unit = { | ||
| val localCheckpoint = synchronized { | ||
| val checkpoint = latestCheckpoint | ||
| latestCheckpoint = None | ||
| checkpoint | ||
| } | ||
| localCheckpoint match { | ||
| case Some(RocksDBCheckpoint(localDir, version, numKeys)) => | ||
| try { | ||
| val uploadTime = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(localDir, version, numKeys) | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| } | ||
| logInfo(s"Upload snapshot of version $version, time taken: $uploadTime ms") | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } finally { | ||
| localCheckpoint.foreach(_.close()) | ||
| } | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drop uncommitted changes, and roll back to previous version. | ||
| */ | ||
| def rollback(): Unit = { | ||
| numKeysOnWritingVersion = numKeysOnLoadedVersion | ||
| loadedVersion = -1L | ||
| changelogWriter.foreach(_.abort()) | ||
| // Make sure changelogWriter gets recreated next time. | ||
| changelogWriter = None | ||
| release() | ||
| logInfo(s"Rolled back to $loadedVersion") | ||
| } | ||
|
|
||
| def cleanup(): Unit = { | ||
| if (enableChangelogCheckpointing) { | ||
| uploadSnapshot() | ||
| } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val cleanupTime = timeTakenMs { | ||
| fileManager.deleteOldVersions(conf.minVersionsToRetain) | ||
| } | ||
|
|
@@ -369,6 +442,7 @@ class RocksDB( | |
| flushOptions.close() | ||
| dbOptions.close() | ||
| dbLogger.close() | ||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| silentDeleteRecursively(localRootDir, "closing RocksDB") | ||
| } catch { | ||
| case e: Exception => | ||
|
|
@@ -550,7 +624,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null) | |
| */ | ||
| case class RocksDBConf( | ||
| minVersionsToRetain: Int, | ||
| minDeltasForSnapshot: Int, | ||
| compactOnCommit: Boolean, | ||
| enableChangelogCheckpointing: Boolean, | ||
| blockSizeKB: Long, | ||
| blockCacheSizeMB: Long, | ||
| lockAcquireTimeoutMs: Long, | ||
|
|
@@ -563,7 +639,8 @@ case class RocksDBConf( | |
| boundedMemoryUsage: Boolean, | ||
| totalMemoryUsageMB: Long, | ||
| writeBufferCacheRatio: Double, | ||
| highPriorityPoolRatio: Double) | ||
| highPriorityPoolRatio: Double, | ||
| compressionCodec: String) | ||
|
|
||
| object RocksDBConf { | ||
| /** Common prefix of all confs in SQLConf that affects RocksDB */ | ||
|
|
@@ -585,6 +662,8 @@ object RocksDBConf { | |
|
|
||
| // Configuration that specifies whether to compact the RocksDB data every time data is committed | ||
| private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false") | ||
| private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry( | ||
| "enableChangelogCheckpointing", "false") | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4") | ||
| private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8") | ||
| // See SPARK-42794 for details. | ||
|
|
@@ -705,7 +784,9 @@ object RocksDBConf { | |
|
|
||
| RocksDBConf( | ||
| storeConf.minVersionsToRetain, | ||
| storeConf.minDeltasForSnapshot, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| getBooleanConf(COMPACT_ON_COMMIT_CONF), | ||
| getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF), | ||
| getPositiveLongConf(BLOCK_SIZE_KB_CONF), | ||
| getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF), | ||
| getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF), | ||
|
|
@@ -718,7 +799,8 @@ object RocksDBConf { | |
| getBooleanConf(BOUNDED_MEMORY_USAGE_CONF), | ||
| getLongConf(MAX_MEMORY_USAGE_MB_CONF), | ||
| getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF), | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF)) | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF), | ||
| storeConf.compressionCodec) | ||
| } | ||
|
|
||
| def apply(): RocksDBConf = apply(new StateStoreConf()) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.