-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider #41099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
da54bb5
5bda37f
8bc8552
be7846b
0512806
974a47e
2d81dc2
9df1340
1b0f94c
bf30cf1
19b8355
4ff442f
b3cc436
0ef2fc9
e59d43f
1e46adc
5f53f49
5910fb7
0ee93c1
7c65cac
b2ead71
36d9ae2
4aa2605
ff4cff9
82e7168
4109c29
573e0e9
fc8c1bd
6480621
590f21c
bb58556
14d7b91
99f5e0a
b1d3809
050f214
f723840
da7aa99
4f9b0a7
7d52ed5
91d0075
5732fbd
6cb6d0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.execution.streaming.state | ||
|
|
||
| import java.io.File | ||
| import java.io.{File, FileNotFoundException} | ||
| import java.util.Locale | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
|
|
@@ -56,6 +56,15 @@ class RocksDB( | |
| hadoopConf: Configuration = new Configuration, | ||
| loggingId: String = "") extends Logging { | ||
|
|
||
| case class RocksDBCheckpoint(checkpointDir: File, version: Long, numKeys: Long) { | ||
|
||
| def close(): Unit = { | ||
| silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version") | ||
| } | ||
| } | ||
|
|
||
| @volatile private var latestCheckpoint: Option[RocksDBCheckpoint] = None | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @volatile private var lastCheckpointVersion = 0L | ||
|
|
||
| RocksDBLoader.loadLibrary() | ||
|
|
||
| // Java wrapper objects linking to native RocksDB objects | ||
|
|
@@ -109,13 +118,15 @@ class RocksDB( | |
| private val nativeStats = dbOptions.statistics() | ||
|
|
||
| private val workingDir = createTempDir("workingDir") | ||
| private val fileManager = new RocksDBFileManager( | ||
| dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId) | ||
| private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), | ||
| hadoopConf, conf.compressionCodec, loggingId = loggingId) | ||
| private val byteArrayPair = new ByteArrayPair() | ||
| private val commitLatencyMs = new mutable.HashMap[String, Long]() | ||
| private val acquireLock = new Object | ||
|
|
||
| @volatile private var db: NativeRocksDB = _ | ||
| @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None | ||
| private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing | ||
| @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded | ||
| @volatile private var numKeysOnLoadedVersion = 0L | ||
| @volatile private var numKeysOnWritingVersion = 0L | ||
|
|
@@ -129,17 +140,36 @@ class RocksDB( | |
| * Note that this will copy all the necessary file from DFS to local disk as needed, | ||
| * and possibly restart the native RocksDB instance. | ||
| */ | ||
| def load(version: Long): RocksDB = { | ||
| def load(version: Long, readOnly: Boolean = false): RocksDB = { | ||
| assert(version >= 0) | ||
| acquire() | ||
| logInfo(s"Loading $version") | ||
| try { | ||
| if (loadedVersion != version) { | ||
| closeDB() | ||
| val metadata = fileManager.loadCheckpointFromDfs(version, workingDir) | ||
| var metadata: RocksDBCheckpointMetadata = null | ||
| // If changelog checkpointing has never been enabled, should be able to load | ||
| // any version of snapshot. | ||
| if (!enableChangelogCheckpointing) { | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try { | ||
| metadata = fileManager.loadCheckpointFromDfs(version, workingDir) | ||
| loadedVersion = version | ||
| } catch { | ||
| // It is possible that changelog checkpointing was enabled during the last query run | ||
| // and this version of snapshot is unavailable, in that case fallback to | ||
| // loading latest snapshot available and replaying changelog. | ||
| case _: FileNotFoundException => loadedVersion = -1 | ||
| } | ||
| } | ||
|
|
||
| if (loadedVersion != version) { | ||
| val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) | ||
| metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) | ||
| loadedVersion = latestSnapshotVersion | ||
| } | ||
| openDB() | ||
|
|
||
| val numKeys = if (!conf.trackTotalNumberOfRows) { | ||
| numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { | ||
| // we don't track the total number of rows - discard the number being track | ||
| -1L | ||
| } else if (metadata.numKeys < 0) { | ||
|
|
@@ -149,10 +179,10 @@ class RocksDB( | |
| } else { | ||
| metadata.numKeys | ||
| } | ||
| numKeysOnWritingVersion = numKeys | ||
| numKeysOnLoadedVersion = numKeys | ||
|
|
||
| loadedVersion = version | ||
| if (loadedVersion != version) replayChangelog(version) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // After changelog replay the numKeysOnWritingVersion will be updated to | ||
| // the correct number of keys in the loaded version. | ||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| fileManagerMetrics = fileManager.latestLoadCheckpointMetrics | ||
| } | ||
| if (conf.resetStatsOnLoad) { | ||
|
|
@@ -164,9 +194,36 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded data | ||
| throw t | ||
| } | ||
| if (enableChangelogCheckpointing && !readOnly) { | ||
| changelogWriter = Some(fileManager.getChangeLogWriter(version + 1)) | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| this | ||
| } | ||
|
|
||
| /** | ||
| * Replay change log from the loaded version to the target version. | ||
| */ | ||
| private def replayChangelog(endVersion: Long): Unit = { | ||
| // This will be noop if changelog checkpointing is disabled. | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (v <- loadedVersion + 1 to endVersion) { | ||
| var changelogReader: StateStoreChangelogReader = null | ||
| try { | ||
| changelogReader = fileManager.getChangelogReader(v) | ||
| while (changelogReader.hasNext) { | ||
| val byteArrayPair = changelogReader.next() | ||
| if (byteArrayPair.value != null) { | ||
| put(byteArrayPair.key, byteArrayPair.value) | ||
| } else { | ||
| remove(byteArrayPair.key) | ||
| } | ||
| } | ||
| } finally { | ||
| if (changelogReader != null) changelogReader.close() | ||
| } | ||
| } | ||
| loadedVersion = endVersion | ||
| } | ||
|
|
||
| /** | ||
| * Get the value for the given key if present, or null. | ||
| * @note This will return the last written value even if it was uncommitted. | ||
|
|
@@ -187,6 +244,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.put(writeOptions, key, value) | ||
| changelogWriter.foreach(_.put(key, value)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -201,6 +259,7 @@ class RocksDB( | |
| } | ||
| } | ||
| db.delete(writeOptions, key) | ||
| changelogWriter.foreach(_.delete(key)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -286,44 +345,61 @@ class RocksDB( | |
| */ | ||
| def commit(): Long = { | ||
| val newVersion = loadedVersion + 1 | ||
| val checkpointDir = createTempDir("checkpoint") | ||
| var rocksDBBackgroundThreadPaused = false | ||
| try { | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
|
|
||
| logInfo(s"Flushing updates for $newVersion") | ||
| val flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
|
|
||
| val compactTimeMs = if (conf.compactOnCommit) { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logInfo("Compacting") | ||
| timeTakenMs { db.compactRange() } | ||
| } else 0 | ||
|
|
||
| logInfo("Pausing background work") | ||
| val pauseTimeMs = timeTakenMs { | ||
| db.pauseBackgroundWork() // To avoid files being changed while committing | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| rocksDBBackgroundThreadPaused = true | ||
| } | ||
|
|
||
| logInfo(s"Creating checkpoint for $newVersion in $checkpointDir") | ||
| val checkpointTimeMs = timeTakenMs { | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| var compactTimeMs = 0L | ||
| var flushTimeMs = 0L | ||
| var checkpointTimeMs = 0L | ||
| if (shouldCreateSnapshot()) { | ||
| // Need to flush the change to disk before creating a checkpoint | ||
| // because rocksdb wal is disabled. | ||
| logInfo(s"Flushing updates for $newVersion") | ||
| flushTimeMs = timeTakenMs { db.flush(flushOptions) } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (conf.compactOnCommit) { | ||
| logInfo("Compacting") | ||
| compactTimeMs = timeTakenMs { db.compactRange() } | ||
| } | ||
| checkpointTimeMs = timeTakenMs { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val checkpointDir = createTempDir("checkpoint") | ||
| logInfo(s"Creating checkpoint for $newVersion in $checkpointDir") | ||
| // Make sure the directory does not exist. Native RocksDB fails if the directory to | ||
| // checkpoint exists. | ||
| Utils.deleteRecursively(checkpointDir) | ||
| // We no longer pause background operation before creating a RocksDB checkpoint because | ||
| // it is unnecessary. The captured snapshot will still be consistent with ongoing | ||
| // background operations. | ||
| val cp = Checkpoint.create(db) | ||
| cp.createCheckpoint(checkpointDir.toString) | ||
| synchronized { | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| latestCheckpoint = Some( | ||
| RocksDBCheckpoint(checkpointDir, newVersion, numKeysOnWritingVersion)) | ||
| lastCheckpointVersion = newVersion | ||
| } | ||
| } | ||
| } | ||
|
|
||
| logInfo(s"Syncing checkpoint for $newVersion to DFS") | ||
| val fileSyncTimeMs = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion) | ||
| if (enableChangelogCheckpointing) { | ||
| try { | ||
| assert(changelogWriter.isDefined) | ||
| changelogWriter.foreach(_.commit()) | ||
| } finally { | ||
| changelogWriter = None | ||
| } | ||
| } else { | ||
| assert(changelogWriter.isEmpty) | ||
| uploadSnapshot() | ||
| } | ||
| } | ||
|
|
||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| loadedVersion = newVersion | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| commitLatencyMs ++= Map( | ||
| "flush" -> flushTimeMs, | ||
| "compact" -> compactTimeMs, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "pause" -> pauseTimeMs, | ||
| "checkpoint" -> checkpointTimeMs, | ||
| "fileSync" -> fileSyncTimeMs | ||
| ) | ||
|
|
@@ -334,25 +410,60 @@ class RocksDB( | |
| loadedVersion = -1 // invalidate loaded version | ||
| throw t | ||
| } finally { | ||
| if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork() | ||
| silentDeleteRecursively(checkpointDir, s"committing $newVersion") | ||
| // reset resources as either 1) we already pushed the changes and it has been committed or | ||
| // 2) commit has failed and the current version is "invalidated". | ||
| release() | ||
| } | ||
| } | ||
|
|
||
| private def shouldCreateSnapshot(): Boolean = { | ||
| if (enableChangelogCheckpointing) { | ||
| assert(changelogWriter.isDefined) | ||
| val newVersion = loadedVersion + 1 | ||
| newVersion - lastCheckpointVersion >= conf.minDeltasForSnapshot || | ||
| changelogWriter.get.size > 10000 | ||
| } else true | ||
| } | ||
|
|
||
| private def uploadSnapshot(): Unit = { | ||
| val localCheckpoint = synchronized { | ||
| val checkpoint = latestCheckpoint | ||
| latestCheckpoint = None | ||
| checkpoint | ||
| } | ||
| localCheckpoint match { | ||
| case Some(RocksDBCheckpoint(localDir, version, numKeys)) => | ||
| try { | ||
| val uploadTime = timeTakenMs { | ||
| fileManager.saveCheckpointToDfs(localDir, version, numKeys) | ||
| fileManagerMetrics = fileManager.latestSaveCheckpointMetrics | ||
| } | ||
| logInfo(s"$loggingId: Upload snapshot of version $version," + | ||
| s" time taken: $uploadTime ms") | ||
| } finally { | ||
| localCheckpoint.foreach(_.close()) | ||
| } | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drop uncommitted changes, and roll back to previous version. | ||
| */ | ||
| def rollback(): Unit = { | ||
| numKeysOnWritingVersion = numKeysOnLoadedVersion | ||
| loadedVersion = -1L | ||
| changelogWriter.foreach(_.abort()) | ||
| // Make sure changelogWriter gets recreated next time. | ||
| changelogWriter = None | ||
| release() | ||
| logInfo(s"Rolled back to $loadedVersion") | ||
| } | ||
|
|
||
| def cleanup(): Unit = { | ||
| def doMaintenance(): Unit = { | ||
| if (enableChangelogCheckpointing) { | ||
| uploadSnapshot() | ||
| } | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val cleanupTime = timeTakenMs { | ||
| fileManager.deleteOldVersions(conf.minVersionsToRetain) | ||
| } | ||
|
|
@@ -369,6 +480,9 @@ class RocksDB( | |
| flushOptions.close() | ||
| dbOptions.close() | ||
| dbLogger.close() | ||
| synchronized { | ||
| latestCheckpoint.foreach(_.close()) | ||
chaoqin-li1123 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| silentDeleteRecursively(localRootDir, "closing RocksDB") | ||
| } catch { | ||
| case e: Exception => | ||
|
|
@@ -550,7 +664,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null) | |
| */ | ||
| case class RocksDBConf( | ||
| minVersionsToRetain: Int, | ||
| minDeltasForSnapshot: Int, | ||
| compactOnCommit: Boolean, | ||
| enableChangelogCheckpointing: Boolean, | ||
| blockSizeKB: Long, | ||
| blockCacheSizeMB: Long, | ||
| lockAcquireTimeoutMs: Long, | ||
|
|
@@ -563,7 +679,8 @@ case class RocksDBConf( | |
| boundedMemoryUsage: Boolean, | ||
| totalMemoryUsageMB: Long, | ||
| writeBufferCacheRatio: Double, | ||
| highPriorityPoolRatio: Double) | ||
| highPriorityPoolRatio: Double, | ||
| compressionCodec: String) | ||
|
|
||
| object RocksDBConf { | ||
| /** Common prefix of all confs in SQLConf that affects RocksDB */ | ||
|
|
@@ -585,6 +702,8 @@ object RocksDBConf { | |
|
|
||
| // Configuration that specifies whether to compact the RocksDB data every time data is committed | ||
| private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false") | ||
| private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry( | ||
| "changelogCheckpointing.enabled", "false") | ||
| private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4") | ||
| private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8") | ||
| // See SPARK-42794 for details. | ||
|
|
@@ -705,7 +824,9 @@ object RocksDBConf { | |
|
|
||
| RocksDBConf( | ||
| storeConf.minVersionsToRetain, | ||
| storeConf.minDeltasForSnapshot, | ||
chaoqin-li1123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| getBooleanConf(COMPACT_ON_COMMIT_CONF), | ||
| getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF), | ||
| getPositiveLongConf(BLOCK_SIZE_KB_CONF), | ||
| getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF), | ||
| getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF), | ||
|
|
@@ -718,7 +839,8 @@ object RocksDBConf { | |
| getBooleanConf(BOUNDED_MEMORY_USAGE_CONF), | ||
| getLongConf(MAX_MEMORY_USAGE_MB_CONF), | ||
| getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF), | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF)) | ||
| getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF), | ||
| storeConf.compressionCodec) | ||
| } | ||
|
|
||
| def apply(): RocksDBConf = apply(new StateStoreConf()) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.