Skip to content
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
da54bb5
initial implementation
chaoqin-li1123 May 4, 2023
5bda37f
add conf
chaoqin-li1123 May 4, 2023
8bc8552
remove unused import
chaoqin-li1123 May 4, 2023
be7846b
test correctness with changelog checkpointing
chaoqin-li1123 May 4, 2023
0512806
add unit test and fix bug
chaoqin-li1123 May 8, 2023
974a47e
clean up
chaoqin-li1123 May 8, 2023
2d81dc2
address comment
chaoqin-li1123 May 9, 2023
9df1340
fix build
chaoqin-li1123 May 9, 2023
1b0f94c
respect minDeltasForSnapshot in changelog checkpointing
chaoqin-li1123 May 9, 2023
bf30cf1
fix checkpoint interval bug and address comment
chaoqin-li1123 May 9, 2023
19b8355
address comments
chaoqin-li1123 May 10, 2023
4ff442f
clean up conf for changelog checkpointing
chaoqin-li1123 May 12, 2023
b3cc436
enable streaming aggregation suite to run with rocksdb
chaoqin-li1123 May 12, 2023
0ef2fc9
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 May 12, 2023
e59d43f
clean up
chaoqin-li1123 May 14, 2023
1e46adc
add doc
chaoqin-li1123 May 14, 2023
5f53f49
address comments
chaoqin-li1123 May 15, 2023
5910fb7
address comments
chaoqin-li1123 May 15, 2023
0ee93c1
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 May 18, 2023
7c65cac
add comments
chaoqin-li1123 May 19, 2023
b2ead71
address comments
chaoqin-li1123 May 19, 2023
36d9ae2
address comments
chaoqin-li1123 May 20, 2023
4aa2605
simplify
chaoqin-li1123 May 20, 2023
ff4cff9
add doc and comments
chaoqin-li1123 May 20, 2023
82e7168
add backward compatibility integration test
chaoqin-li1123 May 22, 2023
4109c29
comment out tests
chaoqin-li1123 May 23, 2023
573e0e9
comment out tests
chaoqin-li1123 May 23, 2023
fc8c1bd
comment out tests
chaoqin-li1123 May 23, 2023
6480621
move tests around to pass ci
chaoqin-li1123 May 24, 2023
590f21c
move tests around to pass ci
chaoqin-li1123 May 24, 2023
bb58556
move tests around to pass ci
chaoqin-li1123 May 25, 2023
14d7b91
move tests around to pass ci
chaoqin-li1123 May 25, 2023
99f5e0a
fix nits
chaoqin-li1123 May 25, 2023
b1d3809
improve doc
chaoqin-li1123 May 25, 2023
050f214
fix test nits
chaoqin-li1123 May 26, 2023
f723840
use NextIterator
chaoqin-li1123 May 26, 2023
da7aa99
make rocksdb state store suite use sqlconf in shared spark session
chaoqin-li1123 May 26, 2023
4f9b0a7
address testing comments
chaoqin-li1123 May 31, 2023
7d52ed5
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 May 31, 2023
91d0075
add after each
chaoqin-li1123 May 31, 2023
5732fbd
fix test failure
chaoqin-li1123 May 31, 2023
6cb6d0b
Merge branch 'master' of github.com:chaoqin-li1123/spark into changelog
chaoqin-li1123 May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
<td>Whether we perform a range compaction of RocksDB instance for commit operation</td>
<td>False</td>
</tr>
<tr>
<td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td>
<td>Whether to upload changelog instead of snapshot during RocksDB StateStore commit</td>
<td>False</td>
</tr>
<tr>
<td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
<td>Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.</td>
Expand Down Expand Up @@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.

##### RocksDB State Store Changelog Checkpointing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide higher-level of description how this works? We even don't explain what does changelog means.

I understand we have no explanation for changelog checkpointing for HDFS backed state store provider which is unfortunate, but for RocksDB state store provider, users have to make a decision whether to use old one (incremental checkpointing) or new one, which requires them to understand the characteristics of two options before choosing one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drafted a newer version with chatgpt, PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK :)

Changelog checkpointing reduces latency of the stateful streaming query. This checkpointing mechanism avoids cost of capturing and uploading snapshots of RocksDB instances in the commit phase of RocksDB state store.
You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`.
Changelog checkpointing is backward compatible. In a version of spark that supports changelog checkpointing, you can turn on changelog checkpointing for a streaming query without discarding the existing checkpoint.
Vice versa, if a query has already run with changelog checkpointing enabled, you can turn off changelog checkpointing safely.

##### Performance-aspect considerations

1. You may want to disable the track of total number of rows to aim the better performance on RocksDB state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
}

override def createTempFile(path: Path): FSDataOutputStream = {
fs.create(path, true, 4096)
fs.create(path, true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "") extends Logging {

case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
}
}

@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L

RocksDBLoader.loadLibrary()

// Java wrapper objects linking to native RocksDB objects
Expand Down Expand Up @@ -109,13 +118,15 @@ class RocksDB(
private val nativeStats = dbOptions.statistics()

private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(
dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId)
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),
hadoopConf, conf.compressionCodec, loggingId = loggingId)
private val byteArrayPair = new ByteArrayPair()
private val commitLatencyMs = new mutable.HashMap[String, Long]()
private val acquireLock = new Object

@volatile private var db: NativeRocksDB = _
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L
Expand All @@ -129,17 +140,20 @@ class RocksDB(
* Note that this will copy all the necessary file from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*/
def load(version: Long): RocksDB = {
def load(version: Long, readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire()
logInfo(s"Loading $version")
try {
if (loadedVersion != version) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion

openDB()

val numKeys = if (!conf.trackTotalNumberOfRows) {
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
Expand All @@ -149,10 +163,10 @@ class RocksDB(
} else {
metadata.numKeys
}
numKeysOnWritingVersion = numKeys
numKeysOnLoadedVersion = numKeys

loadedVersion = version
if (loadedVersion != version) replayChangelog(version)
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
numKeysOnLoadedVersion = numKeysOnWritingVersion
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
}
if (conf.resetStatsOnLoad) {
Expand All @@ -164,9 +178,37 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded data
throw t
}
if (enableChangelogCheckpointing && !readOnly) {
// Make sure we don't leak resource.
changelogWriter.foreach(_.abort())
changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
}
this
}

/**
* Replay change log from the loaded version to the target version.
*/
private def replayChangelog(endVersion: Long): Unit = {
for (v <- loadedVersion + 1 to endVersion) {
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v)
while (changelogReader.hasNext) {
val byteArrayPair = changelogReader.next()
if (byteArrayPair.value != null) {
put(byteArrayPair.key, byteArrayPair.value)
} else {
remove(byteArrayPair.key)
}
}
} finally {
if (changelogReader != null) changelogReader.close()
}
}
loadedVersion = endVersion
}

/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
Expand All @@ -187,6 +229,7 @@ class RocksDB(
}
}
db.put(writeOptions, key, value)
changelogWriter.foreach(_.put(key, value))
}

/**
Expand All @@ -201,6 +244,7 @@ class RocksDB(
}
}
db.delete(writeOptions, key)
changelogWriter.foreach(_.delete(key))
}

/**
Expand Down Expand Up @@ -286,44 +330,66 @@ class RocksDB(
*/
def commit(): Long = {
val newVersion = loadedVersion + 1
val checkpointDir = createTempDir("checkpoint")
var rocksDBBackgroundThreadPaused = false
try {
// Make sure the directory does not exist. Native RocksDB fails if the directory to
// checkpoint exists.
Utils.deleteRecursively(checkpointDir)

logInfo(s"Flushing updates for $newVersion")
val flushTimeMs = timeTakenMs { db.flush(flushOptions) }

val compactTimeMs = if (conf.compactOnCommit) {
logInfo("Compacting")
timeTakenMs { db.compactRange() }
} else 0

logInfo("Pausing background work")
val pauseTimeMs = timeTakenMs {
db.pauseBackgroundWork() // To avoid files being changed while committing
rocksDBBackgroundThreadPaused = true
}

logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
val checkpointTimeMs = timeTakenMs {
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
var compactTimeMs = 0L
var flushTimeMs = 0L
var checkpointTimeMs = 0L
if (shouldCreateSnapshot()) {
// Need to flush the change to disk before creating a checkpoint
// because rocksdb wal is disabled.
logInfo(s"Flushing updates for $newVersion")
flushTimeMs = timeTakenMs { db.flush(flushOptions) }
if (conf.compactOnCommit) {
logInfo("Compacting")
compactTimeMs = timeTakenMs { db.compactRange() }
}
checkpointTimeMs = timeTakenMs {
val checkpointDir = createTempDir("checkpoint")
logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
// Make sure the directory does not exist. Native RocksDB fails if the directory to
// checkpoint exists.
Utils.deleteRecursively(checkpointDir)
// We no longer pause background operation before creating a RocksDB checkpoint because
// it is unnecessary. The captured snapshot will still be consistent with ongoing
// background operations.
val cp = Checkpoint.create(db)
cp.createCheckpoint(checkpointDir.toString)
synchronized {
// if changelog checkpointing is disabled, the snapshot is uploaded synchronously
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
// during state store maintenance.
latestSnapshot.foreach(_.close())
latestSnapshot = Some(
RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
lastSnapshotVersion = newVersion
}
}
}

logInfo(s"Syncing checkpoint for $newVersion to DFS")
val fileSyncTimeMs = timeTakenMs {
fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion)
if (enableChangelogCheckpointing) {
try {
assert(changelogWriter.isDefined)
changelogWriter.foreach(_.commit())
} finally {
changelogWriter = None
}
} else {
assert(changelogWriter.isEmpty)
uploadSnapshot()
}
}

numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
commitLatencyMs ++= Map(
"flush" -> flushTimeMs,
"compact" -> compactTimeMs,
"pause" -> pauseTimeMs,
"checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
Expand All @@ -334,25 +400,60 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded version
throw t
} finally {
if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
silentDeleteRecursively(checkpointDir, s"committing $newVersion")
// reset resources as either 1) we already pushed the changes and it has been committed or
// 2) commit has failed and the current version is "invalidated".
release()
}
}

private def shouldCreateSnapshot(): Boolean = {
if (enableChangelogCheckpointing) {
assert(changelogWriter.isDefined)
val newVersion = loadedVersion + 1
newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot ||
changelogWriter.get.size > 10000
} else true
}

private def uploadSnapshot(): Unit = {
val localCheckpoint = synchronized {
val checkpoint = latestSnapshot
latestSnapshot = None
checkpoint
}
localCheckpoint match {
case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
try {
val uploadTime = timeTakenMs {
fileManager.saveCheckpointToDfs(localDir, version, numKeys)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(s"$loggingId: Upload snapshot of version $version," +
s" time taken: $uploadTime ms")
} finally {
localCheckpoint.foreach(_.close())
}
case _ =>
}
}

/**
* Drop uncommitted changes, and roll back to previous version.
*/
def rollback(): Unit = {
numKeysOnWritingVersion = numKeysOnLoadedVersion
loadedVersion = -1L
changelogWriter.foreach(_.abort())
// Make sure changelogWriter gets recreated next time.
changelogWriter = None
release()
logInfo(s"Rolled back to $loadedVersion")
}

def cleanup(): Unit = {
def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
}
Expand All @@ -369,6 +470,9 @@ class RocksDB(
flushOptions.close()
dbOptions.close()
dbLogger.close()
synchronized {
latestSnapshot.foreach(_.close())
}
silentDeleteRecursively(localRootDir, "closing RocksDB")
} catch {
case e: Exception =>
Expand Down Expand Up @@ -550,7 +654,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
*/
case class RocksDBConf(
minVersionsToRetain: Int,
minDeltasForSnapshot: Int,
compactOnCommit: Boolean,
enableChangelogCheckpointing: Boolean,
blockSizeKB: Long,
blockCacheSizeMB: Long,
lockAcquireTimeoutMs: Long,
Expand All @@ -563,7 +669,8 @@ case class RocksDBConf(
boundedMemoryUsage: Boolean,
totalMemoryUsageMB: Long,
writeBufferCacheRatio: Double,
highPriorityPoolRatio: Double)
highPriorityPoolRatio: Double,
compressionCodec: String)

object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
Expand All @@ -585,6 +692,8 @@ object RocksDBConf {

// Configuration that specifies whether to compact the RocksDB data every time data is committed
private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false")
private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry(
"changelogCheckpointing.enabled", "false")
private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4")
private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8")
// See SPARK-42794 for details.
Expand Down Expand Up @@ -705,7 +814,9 @@ object RocksDBConf {

RocksDBConf(
storeConf.minVersionsToRetain,
storeConf.minDeltasForSnapshot,
getBooleanConf(COMPACT_ON_COMMIT_CONF),
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
getPositiveLongConf(BLOCK_SIZE_KB_CONF),
getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
Expand All @@ -718,7 +829,8 @@ object RocksDBConf {
getBooleanConf(BOUNDED_MEMORY_USAGE_CONF),
getLongConf(MAX_MEMORY_USAGE_MB_CONF),
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF))
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
storeConf.compressionCodec)
}

def apply(): RocksDBConf = apply(new StateStoreConf())
Expand Down
Loading