diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 55f648a4a05c8..faa7033a147d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -148,7 +148,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) AppStatusStore.CURRENT_VERSION, logDir.toString()) try { - open(dbPath, metadata) + open(dbPath, metadata, conf) } catch { // If there's an error, remove the listing database and any existing UI database // from the store directory, since it's extremely likely that they'll all contain @@ -156,12 +156,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case _: UnsupportedStoreVersionException | _: MetadataMismatchException => logInfo("Detected incompatible DB versions, deleting...") path.listFiles().foreach(Utils.deleteRecursively) - open(dbPath, metadata) + open(dbPath, metadata, conf) case dbExc @ (_: NativeDB.DBException | _: RocksDBException) => // Get rid of the corrupted data and re-create it. logWarning(s"Failed to load disk store $dbPath :", dbExc) Utils.deleteRecursively(dbPath) - open(dbPath, metadata) + open(dbPath, metadata, conf) } }.getOrElse(new InMemoryStore()) @@ -1218,7 +1218,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // the existing data. dm.openStore(appId, attempt.info.attemptId).foreach { path => try { - return KVUtils.open(path, metadata) + return KVUtils.open(path, metadata, conf) } catch { case e: Exception => logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e) @@ -1284,14 +1284,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined) - val diskStore = KVUtils.open(lease.tmpPath, metadata) + val diskStore = KVUtils.open(lease.tmpPath, metadata, conf) hybridStore.setDiskStore(diskStore) hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener { override def onSwitchToDiskStoreSuccess: Unit = { logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.") diskStore.close() val newStorePath = lease.commit(appId, attempt.info.attemptId) - hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata)) + hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf)) memoryManager.release(appId, attempt.info.attemptId) } override def onSwitchToDiskStoreFail(e: Exception): Unit = { @@ -1327,7 +1327,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") val lease = dm.lease(reader.totalSize, isCompressed) try { - Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => + Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf)) { store => rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) } newStorePath = lease.commit(appId, attempt.info.attemptId) @@ -1345,7 +1345,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - KVUtils.open(newStorePath, metadata) + KVUtils.open(newStorePath, metadata, conf) } private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index ddee539eb9eb4..7a4b613ac0696 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -38,8 +38,8 @@ private[spark] object KVUtils extends Logging { /** Use this to annotate constructor params to be used as KVStore indices. */ type KVIndexParam = KVIndex @getter - private lazy val backend = - HybridStoreDiskBackend.withName(new SparkConf().get(HYBRID_STORE_DISK_BACKEND)) + private def backend(conf: SparkConf) = + HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND)) /** * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as @@ -59,11 +59,12 @@ private[spark] object KVUtils extends Logging { * @param metadata Metadata value to compare to the data in the store. If the store does not * contain any metadata (e.g. it's a new store), this value is written as * the store's metadata. + * @param conf SparkConf use to get `HYBRID_STORE_DISK_BACKEND` */ - def open[M: ClassTag](path: File, metadata: M): KVStore = { + def open[M: ClassTag](path: File, metadata: M, conf: SparkConf): KVStore = { require(metadata != null, "Metadata is required.") - val db = backend match { + val db = backend(conf) match { case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer()) case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer()) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala index de5b5187aa2fa..c534d66c1571c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -26,13 +26,20 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.History._ +import org.apache.spark.internal.config.History.HybridStoreDiskBackend import org.apache.spark.status.KVUtils -import org.apache.spark.tags.ExtendedLevelDBTest +import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest} import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.util.kvstore.KVStore -@ExtendedLevelDBTest -class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { +abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { + + protected def backend: HybridStoreDiskBackend.Value + + protected def extension: String + + protected def conf: SparkConf = new SparkConf() + .set(HYBRID_STORE_DISK_BACKEND, backend.toString) private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*) @@ -43,7 +50,7 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { before { testDir = Utils.createTempDir() - store = KVUtils.open(new File(testDir, "listing"), "test") + store = KVUtils.open(new File(testDir, "listing"), "test", conf) } after { @@ -213,10 +220,20 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-38095: appStorePath should use backend extensions") { - HybridStoreDiskBackend.values.zip(Seq(".ldb", ".rdb")).foreach { case (backend, extension) => - val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString) - val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock()) - assert(manager.appStorePath("appId", None).getName.endsWith(extension)) - } + val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString) + val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock()) + assert(manager.appStorePath("appId", None).getName.endsWith(extension)) } } + +@ExtendedLevelDBTest +class HistoryServerDiskManagerUseLevelDBSuite extends HistoryServerDiskManagerSuite { + override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB + override protected def extension: String = ".ldb" +} + +@ExtendedRocksDBTest +class HistoryServerDiskManagerUseRocksDBSuite extends HistoryServerDiskManagerSuite { + override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB + override protected def extension: String = ".rdb" +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index c6db626121fa2..5e2e931c37689 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.ResourceProfile @@ -36,15 +37,11 @@ import org.apache.spark.scheduler.cluster._ import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ -import org.apache.spark.tags.ExtendedLevelDBTest +import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest} import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} -@ExtendedLevelDBTest -class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { - private val conf = new SparkConf() - .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) - .set(ASYNC_TRACKING_ENABLED, false) +abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2) @@ -53,7 +50,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { private var store: ElementTrackingStore = _ private var taskIdTracker = -1L - protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName()) + protected def conf: SparkConf = new SparkConf() + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + .set(ASYNC_TRACKING_ENABLED, false) + + protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName(), conf) before { time = 0L @@ -1891,3 +1892,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite { override def createKVStore: KVStore = new InMemoryStore() } + +@ExtendedLevelDBTest +class AppStatusListenerWithLevelDBSuite extends AppStatusListenerSuite { + override def conf: SparkConf = super.conf + .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString) +} + +@ExtendedRocksDBTest +class AppStatusListenerWithRocksDBSuite extends AppStatusListenerSuite { + override def conf: SparkConf = super.conf + .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString) +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 422d80976867d..798cff8d60fcd 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.status import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality} @@ -81,7 +82,8 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = { + private def createAppStore(disk: Boolean, diskStoreType: HybridStoreDiskBackend.Value = null, + live: Boolean): AppStatusStore = { val conf = new SparkConf() if (live) { return AppStatusStore.createLiveStore(conf) @@ -92,8 +94,9 @@ class AppStatusStoreSuite extends SparkFunSuite { } val store: KVStore = if (disk) { + conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString) val testDir = Utils.createTempDir() - val diskStore = KVUtils.open(testDir, getClass.getName) + val diskStore = KVUtils.open(testDir, getClass.getName, conf) new ElementTrackingStore(diskStore, conf) } else { new ElementTrackingStore(new InMemoryStore, conf) @@ -102,7 +105,8 @@ class AppStatusStoreSuite extends SparkFunSuite { } Seq( - "disk" -> createAppStore(disk = true, live = false), + "disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false), + "disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false), "in memory" -> createAppStore(disk = false, live = false), "in memory live" -> createAppStore(disk = false, live = true) ).foreach { case (hint, appStore) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index eee1a7c5ff3cd..1d1b51354f8d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -23,6 +23,8 @@ import java.util.{Date, UUID} import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore import org.apache.spark.sql.internal.StaticSQLConf @@ -30,7 +32,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro import org.apache.spark.sql.streaming import org.apache.spark.status.{ElementTrackingStore, KVUtils} import org.apache.spark.util.Utils -import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, RocksDB} +import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} class StreamingQueryStatusListenerSuite extends StreamTest { @@ -221,8 +223,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest { test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") { assume(!Utils.isMacOnAppleSilicon) + val conf = new SparkConf() + .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString) val testDir = Utils.createTempDir() - val kvStore = KVUtils.open(testDir, getClass.getName) + val kvStore = KVUtils.open(testDir, getClass.getName, conf) try { testStreamingQueryData(kvStore) } finally { @@ -233,8 +237,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest { test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") { assume(!Utils.isMacOnAppleSilicon) + val conf = new SparkConf() + .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString) val testDir = Utils.createTempDir() - val kvStore = new RocksDB(testDir) + val kvStore = KVUtils.open(testDir, getClass.getName, conf) try { testStreamingQueryData(kvStore) } finally {