Skip to content

Commit c05d0fd

Browse files
anishshri-dbHeartSaVioR
authored andcommitted
[SPARK-39781][SS] Add support for providing max_open_files to rocksdb state store provider
### What changes were proposed in this pull request? For some large users of stateful queries with lot of rocksdb related files open, they run into IO exceptions around "too many open files". ``` Job aborted due to stage failure: ... : org.rocksdb.RocksDBException: While open a file for random read: ... XXX.sst: Too many open files ``` This change allows configuring the max_open_files property for the underlying RocksDB instance. ### Why are the changes needed? By default, value for maxOpenFiles is -1, which means that the DB can keep opened files always open. However, in some cases, this will hit the OS limit and crash the process. As part of this change, we provide a state store config option for RocksDB to set this to a finite value so that number of opened files can be bounded per RocksDB instance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests to validate config passed through a RocksDB conf as well as through Spark session. ``` [info] - RocksDB confs are passed correctly from SparkSession to db instance (2 seconds, 377 milliseconds) 12:54:57.927 WARN org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 4 seconds, 24 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ``` [info] RocksDBSuite: 12:55:56.165 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] - SPARK-39781: adding valid max_open_files=-1 config property for RocksDB state store instance should succeed (1 second, 553 milliseconds) [info] - SPARK-39781: adding valid max_open_files=100 config property for RocksDB state store instance should succeed (664 milliseconds) [info] - SPARK-39781: adding valid max_open_files=1000 config property for RocksDB state store instance should succeed (558 milliseconds) [info] - SPARK-39781: adding invalid max_open_files=test config property for RocksDB state store instance should fail (9 milliseconds) [info] - SPARK-39781: adding invalid max_open_files=true config property for RocksDB state store instance should fail (8 milliseconds) [info] Run completed in 3 seconds, 815 milliseconds. [info] Total number of tests run: 5 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #37196 from anishshri-db/task/SPARK-39781. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 3a39090 commit c05d0fd

File tree

4 files changed

+71
-2
lines changed

4 files changed

+71
-2
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1958,6 +1958,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
19581958
<td>The waiting time in millisecond for acquiring lock in the load operation for RocksDB instance.</td>
19591959
<td>60000</td>
19601960
</tr>
1961+
<tr>
1962+
<td>spark.sql.streaming.stateStore.rocksdb.maxOpenFiles</td>
1963+
<td>The number of open files that can be used by the RocksDB instance. Value of -1 means that files opened are always kept open. If the open file limit is reached, RocksDB will evict entries from the open file cache and close those file descriptors and remove the entries from the cache.</td>
1964+
<td>-1</td>
1965+
</tr>
19611966
<tr>
19621967
<td>spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad</td>
19631968
<td>Whether we resets all ticker and histogram stats for RocksDB on load.</td>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class RocksDB(
7575
private val dbOptions = new Options() // options to open the RocksDB
7676
dbOptions.setCreateIfMissing(true)
7777
dbOptions.setTableFormatConfig(tableFormatConfig)
78+
dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
7879
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
7980
dbOptions.setStatistics(new Statistics())
8081
private val nativeStats = dbOptions.statistics()
@@ -542,7 +543,8 @@ case class RocksDBConf(
542543
lockAcquireTimeoutMs: Long,
543544
resetStatsOnLoad : Boolean,
544545
formatVersion: Int,
545-
trackTotalNumberOfRows: Boolean)
546+
trackTotalNumberOfRows: Boolean,
547+
maxOpenFiles: Int)
546548

547549
object RocksDBConf {
548550
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -558,6 +560,9 @@ object RocksDBConf {
558560
private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
559561
private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
560562
private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
563+
// Config to specify the number of open files that can be used by the DB. Value of -1 means
564+
// that files opened are always kept open.
565+
private val MAX_OPEN_FILES_CONF = ConfEntry("maxOpenFiles", "-1")
561566
// Configuration to set the RocksDB format version. When upgrading the RocksDB version in Spark,
562567
// it may introduce a new table format version that can not be supported by an old RocksDB version
563568
// used by an old Spark version. Hence, we store the table format version in the checkpoint when
@@ -588,6 +593,13 @@ object RocksDBConf {
588593
}
589594
}
590595

596+
def getIntConf(conf: ConfEntry): Int = {
597+
Try { confs.getOrElse(conf.fullName, conf.default).toInt } getOrElse {
598+
throw new IllegalArgumentException(s"Invalid value for '${conf.fullName}', " +
599+
"must be an integer")
600+
}
601+
}
602+
591603
def getPositiveLongConf(conf: ConfEntry): Long = {
592604
Try { confs.getOrElse(conf.fullName, conf.default).toLong } filter { _ >= 0 } getOrElse {
593605
throw new IllegalArgumentException(
@@ -610,7 +622,8 @@ object RocksDBConf {
610622
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
611623
getBooleanConf(RESET_STATS_ON_LOAD),
612624
getPositiveIntConf(FORMAT_VERSION),
613-
getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS))
625+
getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS),
626+
getIntConf(MAX_OPEN_FILES_CONF))
614627
}
615628

616629
def apply(): RocksDBConf = apply(new StateStoreConf())

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
7575
classOf[RocksDBStateStoreProvider].getName),
7676
(RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
7777
(RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
78+
(RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
7879
(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
7980
)
8081
testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -100,6 +101,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
100101
assert(rocksDBConfInTask.compactOnCommit == true)
101102
assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
102103
assert(rocksDBConfInTask.formatVersion == 4)
104+
assert(rocksDBConfInTask.maxOpenFiles == 1000)
103105
}
104106
}
105107

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,55 @@ class RocksDBSuite extends SparkFunSuite {
472472
}
473473
}
474474

475+
// Add tests to check valid and invalid values for max_open_files passed to the underlying
476+
// RocksDB instance.
477+
Seq("-1", "100", "1000").foreach { maxOpenFiles =>
478+
test(s"SPARK-39781: adding valid max_open_files=$maxOpenFiles config property " +
479+
"for RocksDB state store instance should succeed") {
480+
withTempDir { dir =>
481+
val sqlConf = SQLConf.get
482+
sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles", maxOpenFiles)
483+
val dbConf = RocksDBConf(StateStoreConf(sqlConf))
484+
assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
485+
486+
val remoteDir = dir.getCanonicalPath
487+
withDB(remoteDir, conf = dbConf) { db =>
488+
// Do some DB ops
489+
db.load(0)
490+
db.put("a", "1")
491+
db.commit()
492+
assert(toStr(db.get("a")) === "1")
493+
}
494+
}
495+
}
496+
}
497+
498+
Seq("test", "true").foreach { maxOpenFiles =>
499+
test(s"SPARK-39781: adding invalid max_open_files=$maxOpenFiles config property " +
500+
"for RocksDB state store instance should fail") {
501+
withTempDir { dir =>
502+
val ex = intercept[IllegalArgumentException] {
503+
val sqlConf = SQLConf.get
504+
sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles",
505+
maxOpenFiles)
506+
val dbConf = RocksDBConf(StateStoreConf(sqlConf))
507+
assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
508+
509+
val remoteDir = dir.getCanonicalPath
510+
withDB(remoteDir, conf = dbConf) { db =>
511+
// Do some DB ops
512+
db.load(0)
513+
db.put("a", "1")
514+
db.commit()
515+
assert(toStr(db.get("a")) === "1")
516+
}
517+
}
518+
assert(ex.getMessage.contains("Invalid value for"))
519+
assert(ex.getMessage.contains("must be an integer"))
520+
}
521+
}
522+
}
523+
475524
test("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart") {
476525
withTempDir { dir =>
477526
val remoteDir = dir.getCanonicalPath

0 commit comments

Comments
 (0)