diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 25ea75acc37d3..891c5bffa52ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -128,6 +128,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) + private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED) + // Visible for testing. private[history] val listing: KVStore = storePath.map { path => val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile() @@ -158,6 +160,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + private var memoryManager: HistoryServerMemoryManager = null + if (hybridStoreEnabled) { + memoryManager = new HistoryServerMemoryManager(conf) + } + private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs, conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN), conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD)) @@ -262,6 +269,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def startPolling(): Unit = { diskManager.foreach(_.initialize()) + if (memoryManager != null) { + memoryManager.initialize() + } // Validate the log directory. val path = new Path(logDir) @@ -1167,6 +1177,95 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // At this point the disk data either does not exist or was deleted because it failed to // load, so the event log needs to be replayed. + // If the hybrid store is enabled, try it first and fail back to leveldb store. + if (hybridStoreEnabled) { + try { + return createHybridStore(dm, appId, attempt, metadata) + } catch { + case e: Exception => + logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." + + " Using LevelDB.", e) + } + } + + createLevelDBStore(dm, appId, attempt, metadata) + } + + private def createHybridStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { + var retried = false + var hybridStore: HybridStore = null + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + + // Use InMemoryStore to rebuild app store + while (hybridStore == null) { + // A RuntimeException will be thrown if the heap memory is not sufficient + memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, + reader.compressionCodec) + var store: HybridStore = null + try { + store = new HybridStore() + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + hybridStore = store + } catch { + case _: IOException if !retried => + // compaction may touch the file(s) which app rebuild wants to read + // compaction wouldn't run in short interval, so try again... + logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " + + "trying again...") + store.close() + memoryManager.release(appId, attempt.info.attemptId) + retried = true + case e: Exception => + store.close() + memoryManager.release(appId, attempt.info.attemptId) + throw e + } + } + + // Create a LevelDB and start a background thread to dump data to LevelDB + var lease: dm.Lease = null + try { + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined) + val levelDB = KVUtils.open(lease.tmpPath, metadata) + hybridStore.setLevelDB(levelDB) + hybridStore.switchToLevelDB(new HybridStore.SwitchToLevelDBListener { + override def onSwitchToLevelDBSuccess: Unit = { + logInfo(s"Completely switched to LevelDB for app $appId / ${attempt.info.attemptId}.") + levelDB.close() + val newStorePath = lease.commit(appId, attempt.info.attemptId) + hybridStore.setLevelDB(KVUtils.open(newStorePath, metadata)) + memoryManager.release(appId, attempt.info.attemptId) + } + override def onSwitchToLevelDBFail(e: Exception): Unit = { + logWarning(s"Failed to switch to LevelDB for app $appId / ${attempt.info.attemptId}", e) + levelDB.close() + lease.rollback() + } + }, appId, attempt.info.attemptId) + } catch { + case e: Exception => + hybridStore.close() + memoryManager.release(appId, attempt.info.attemptId) + if (lease != null) { + lease.rollback() + } + throw e + } + + hybridStore + } + + private def createLevelDBStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { var retried = false var newStorePath: File = null while (newStorePath == null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala new file mode 100644 index 0000000000000..7fc0722233854 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.HashMap + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.History._ +import org.apache.spark.util.Utils + +/** + * A class used to keep track of in-memory store usage by the SHS. + */ +private class HistoryServerMemoryManager( + conf: SparkConf) extends Logging { + + private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE) + private val currentUsage = new AtomicLong(0L) + private val active = new HashMap[(String, Option[String]), Long]() + + def initialize(): Unit = { + logInfo("Initialized memory manager: " + + s"current usage = ${Utils.bytesToString(currentUsage.get())}, " + + s"max usage = ${Utils.bytesToString(maxUsage)}") + } + + def lease( + appId: String, + attemptId: Option[String], + eventLogSize: Long, + codec: Option[String]): Unit = { + val memoryUsage = approximateMemoryUsage(eventLogSize, codec) + if (memoryUsage + currentUsage.get > maxUsage) { + throw new RuntimeException("Not enough memory to create hybrid store " + + s"for app $appId / $attemptId.") + } + active.synchronized { + active(appId -> attemptId) = memoryUsage + } + currentUsage.addAndGet(memoryUsage) + logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " + + s"app $appId / $attemptId") + } + + def release(appId: String, attemptId: Option[String]): Unit = { + val memoryUsage = active.synchronized { active.remove(appId -> attemptId) } + + memoryUsage match { + case Some(m) => + currentUsage.addAndGet(-m) + logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " + + s"app $appId / $attemptId") + case None => + } + } + + private def approximateMemoryUsage(eventLogSize: Long, codec: Option[String]): Long = { + codec match { + case Some("zstd") => + eventLogSize * 10 + case Some(_) => + eventLogSize * 4 + case None => + eventLogSize / 2 + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala new file mode 100644 index 0000000000000..96db86f8e745a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import org.apache.spark.util.kvstore._ + +/** + * An implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the app store is restored. We don't expect write + * operations (except the case for caching) after calling switch to level DB. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private val shouldUseInMemoryStore = new AtomicBoolean(true) + + // Flag to indicate whether this hybrid store is closed, use this flag + // to avoid starting background thread after the store is closed + private val closed = new AtomicBoolean(false) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // A hash map that stores all classes that had been writen to inMemoryStore + private val klassMap = new ConcurrentHashMap[Class[_], Boolean] + + override def getMetadata[T](klass: Class[T]): T = { + getStore().getMetadata(klass) + } + + override def setMetadata(value: Object): Unit = { + getStore().setMetadata(value) + } + + override def read[T](klass: Class[T], naturalKey: Object): T = { + getStore().read(klass, naturalKey) + } + + override def write(value: Object): Unit = { + getStore().write(value) + + if (backgroundThread == null) { + // New classes won't be dumped once the background thread is started + klassMap.putIfAbsent(value.getClass(), true) + } + } + + override def delete(klass: Class[_], naturalKey: Object): Unit = { + if (backgroundThread != null) { + throw new IllegalStateException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB") + } + + getStore().delete(klass, naturalKey) + } + + override def view[T](klass: Class[T]): KVStoreView[T] = { + getStore().view(klass) + } + + override def count(klass: Class[_]): Long = { + getStore().count(klass) + } + + override def count(klass: Class[_], index: String, indexedValue: Object): Long = { + getStore().count(klass, index, indexedValue) + } + + override def close(): Unit = { + try { + closed.set(true) + if (backgroundThread != null && backgroundThread.isAlive()) { + // The background thread is still running, wait for it to finish + backgroundThread.join() + } + } finally { + inMemoryStore.close() + if (levelDB != null) { + levelDB.close() + } + } + } + + override def removeAllByIndexValues[T]( + klass: Class[T], + index: String, + indexValues: Collection[_]): Boolean = { + if (backgroundThread != null) { + throw new IllegalStateException("removeAllByIndexValues() shouldn't be " + + "called after the hybrid store begins switching to levelDB") + } + + getStore().removeAllByIndexValues(klass, index, indexValues) + } + + def setLevelDB(levelDB: LevelDB): Unit = { + this.levelDB = levelDB + } + + /** + * This method is called when the writing is done for inMemoryStore. A + * background thread will be created and be started to dump data in inMemoryStore + * to levelDB. Once the dumping is completed, the underlying kvstore will be + * switched to levelDB. + */ + def switchToLevelDB( + listener: HybridStore.SwitchToLevelDBListener, + appId: String, + attemptId: Option[String]): Unit = { + if (closed.get) { + return + } + + backgroundThread = new Thread(() => { + try { + for (klass <- klassMap.keys().asScala) { + val it = inMemoryStore.view(klass).closeableIterator() + while (it.hasNext()) { + levelDB.write(it.next()) + } + } + listener.onSwitchToLevelDBSuccess() + shouldUseInMemoryStore.set(false) + inMemoryStore.close() + } catch { + case e: Exception => + listener.onSwitchToLevelDBFail(e) + } + }) + backgroundThread.setDaemon(true) + backgroundThread.setName(s"hybridstore-$appId-$attemptId") + backgroundThread.start() + } + + /** + * This method return the store that we should use. + */ + private def getStore(): KVStore = { + if (shouldUseInMemoryStore.get) { + inMemoryStore + } else { + levelDB + } + } +} + +private[history] object HybridStore { + + trait SwitchToLevelDBListener { + + def onSwitchToLevelDBSuccess(): Unit + + def onSwitchToLevelDBFail(e: Exception): Unit + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 581777de366ef..a6d1c044130f5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -195,4 +195,20 @@ private[spark] object History { .version("3.0.0") .booleanConf .createWithDefault(true) + + val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled") + .doc("Whether to use HybridStore as the store when parsing event logs. " + + "HybridStore will first write data to an in-memory store and having a background thread " + + "that dumps data to a disk store after the writing to in-memory store is completed.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + val MAX_IN_MEMORY_STORE_USAGE = ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage") + .doc("Maximum memory space that can be used to create HybridStore. The HybridStore co-uses " + + "the heap memory, so the heap memory should be increased through the memory option for SHS " + + "if the HybridStore is enabled.") + .version("3.1.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("2g") } diff --git a/docs/monitoring.md b/docs/monitoring.md index 32959b77c4773..81c386aa90dd8 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -373,6 +373,25 @@ Security options for the Spark History Server are covered more detail in the 3.0.0 + + spark.history.store.hybridStore.enabled + false + + Whether to use HybridStore as the store when parsing event logs. HybridStore will first write data + to an in-memory store and having a background thread that dumps data to a disk store after the writing + to in-memory store is completed. + + 3.1.0 + + + spark.history.store.hybridStore.maxMemoryUsage + 2g + + Maximum memory space that can be used to create HybridStore. The HybridStore co-uses the heap memory, + so the heap memory should be increased through the memory option for SHS if the HybridStore is enabled. + + 3.1.0 + Note that in all of these UIs, the tables are sortable by clicking their headers,