Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)

private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile()
Expand Down Expand Up @@ -158,6 +160,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private var memoryManager: HistoryServerMemoryManager = null
if (hybridStoreEnabled) {
memoryManager = new HistoryServerMemoryManager(conf)
}

private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs,
conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN), conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD))

Expand Down Expand Up @@ -262,6 +269,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private def startPolling(): Unit = {
diskManager.foreach(_.initialize())
if (memoryManager != null) {
memoryManager.initialize()
}

// Validate the log directory.
val path = new Path(logDir)
Expand Down Expand Up @@ -1167,6 +1177,95 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.

// If the hybrid store is enabled, try it first and fail back to leveldb store.
if (hybridStoreEnabled) {
try {
return createHybridStore(dm, appId, attempt, metadata)
} catch {
case e: Exception =>
logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." +
" Using LevelDB.", e)
}
}

createLevelDBStore(dm, appId, attempt, metadata)
}

private def createHybridStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
metadata: AppStatusStoreMetadata): KVStore = {
var retried = false
var hybridStore: HybridStore = null
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)

// Use InMemoryStore to rebuild app store
while (hybridStore == null) {
// A RuntimeException will be thrown if the heap memory is not sufficient
memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize,
reader.compressionCodec)
var store: HybridStore = null
try {
store = new HybridStore()
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
hybridStore = store
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " +
"trying again...")
store.close()
memoryManager.release(appId, attempt.info.attemptId)
retried = true
case e: Exception =>
store.close()
memoryManager.release(appId, attempt.info.attemptId)
throw e
}
}

// Create a LevelDB and start a background thread to dump data to LevelDB
var lease: dm.Lease = null
try {
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
val levelDB = KVUtils.open(lease.tmpPath, metadata)
hybridStore.setLevelDB(levelDB)
hybridStore.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {
override def onSwitchToLevelDBSuccess: Unit = {
logInfo(s"Completely switched to LevelDB for app $appId / ${attempt.info.attemptId}.")
levelDB.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
hybridStore.setLevelDB(KVUtils.open(newStorePath, metadata))
memoryManager.release(appId, attempt.info.attemptId)
}
override def onSwitchToLevelDBFail(e: Exception): Unit = {
logWarning(s"Failed to switch to LevelDB for app $appId / ${attempt.info.attemptId}", e)
levelDB.close()
lease.rollback()
}
}, appId, attempt.info.attemptId)
} catch {
case e: Exception =>
hybridStore.close()
memoryManager.release(appId, attempt.info.attemptId)
if (lease != null) {
lease.rollback()
}
throw e
}

hybridStore
}

private def createLevelDBStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
metadata: AppStatusStoreMetadata): KVStore = {
var retried = false
var newStorePath: File = null
while (newStorePath == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.HashMap

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
import org.apache.spark.util.Utils

/**
* A class used to keep track of in-memory store usage by the SHS.
*/
private class HistoryServerMemoryManager(
conf: SparkConf) extends Logging {

private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
private val currentUsage = new AtomicLong(0L)
private val active = new HashMap[(String, Option[String]), Long]()

def initialize(): Unit = {
logInfo("Initialized memory manager: " +
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
s"max usage = ${Utils.bytesToString(maxUsage)}")
}

def lease(
appId: String,
attemptId: Option[String],
eventLogSize: Long,
codec: Option[String]): Unit = {
val memoryUsage = approximateMemoryUsage(eventLogSize, codec)
if (memoryUsage + currentUsage.get > maxUsage) {
throw new RuntimeException("Not enough memory to create hybrid store " +
s"for app $appId / $attemptId.")
}
active.synchronized {
active(appId -> attemptId) = memoryUsage
}
currentUsage.addAndGet(memoryUsage)
logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " +
s"app $appId / $attemptId")
}

def release(appId: String, attemptId: Option[String]): Unit = {
val memoryUsage = active.synchronized { active.remove(appId -> attemptId) }

memoryUsage match {
case Some(m) =>
currentUsage.addAndGet(-m)
logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " +
s"app $appId / $attemptId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be exhaustive, according to the build result. If we don't do anything for the None, please add case None =>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

case None =>
}
}

private def approximateMemoryUsage(eventLogSize: Long, codec: Option[String]): Long = {
codec match {
case Some("zstd") =>
eventLogSize * 10
case Some(_) =>
eventLogSize * 4
case None =>
eventLogSize / 2
}
}
}
185 changes: 185 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.IOException
import java.util.Collection
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._

import org.apache.spark.util.kvstore._

/**
* An implementation of KVStore that accelerates event logs loading.
*
* When rebuilding the application state from event logs, HybridStore will
* write data to InMemoryStore at first and use a background thread to dump
* data to LevelDB once the app store is restored. We don't expect write
* operations (except the case for caching) after calling switch to level DB.
*/

private[history] class HybridStore extends KVStore {

private val inMemoryStore = new InMemoryStore()

private var levelDB: LevelDB = null

// Flag to indicate whether we should use inMemoryStore or levelDB
private val shouldUseInMemoryStore = new AtomicBoolean(true)

// Flag to indicate whether this hybrid store is closed, use this flag
// to avoid starting background thread after the store is closed
private val closed = new AtomicBoolean(false)

// A background thread that dumps data from inMemoryStore to levelDB
private var backgroundThread: Thread = null

// A hash map that stores all classes that had been writen to inMemoryStore
private val klassMap = new ConcurrentHashMap[Class[_], Boolean]

override def getMetadata[T](klass: Class[T]): T = {
getStore().getMetadata(klass)
}

override def setMetadata(value: Object): Unit = {
getStore().setMetadata(value)
}

override def read[T](klass: Class[T], naturalKey: Object): T = {
getStore().read(klass, naturalKey)
}

override def write(value: Object): Unit = {
getStore().write(value)

if (backgroundThread == null) {
// New classes won't be dumped once the background thread is started
klassMap.putIfAbsent(value.getClass(), true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect a write after a background thread has started? We might want to throw an IIlegalStateException

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write operation is only allowed for CacheQuantile objects after the rebuildAppStore() is finished. Here if we want to throw IIlegalStateException, we need to have special logic to check if the value is of class CacheQuantile. I think we would prefer to avoid that to make the HybridStore as generic as possible.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth mentioning CacheQuantile in the comment as it can be recalculated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should mention specific class - we added the assumption in class doc, with trying to generalize the case. If we feel the class doc isn't enough then we can comment the assumption here as well, but let's generalize.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok i missed that thanks

}
}

override def delete(klass: Class[_], naturalKey: Object): Unit = {
if (backgroundThread != null) {
throw new IllegalStateException("delete() shouldn't be called after " +
"the hybrid store begins switching to levelDB")
}

getStore().delete(klass, naturalKey)
}

override def view[T](klass: Class[T]): KVStoreView[T] = {
getStore().view(klass)
}

override def count(klass: Class[_]): Long = {
getStore().count(klass)
}

override def count(klass: Class[_], index: String, indexedValue: Object): Long = {
getStore().count(klass, index, indexedValue)
}

override def close(): Unit = {
try {
closed.set(true)
if (backgroundThread != null && backgroundThread.isAlive()) {
// The background thread is still running, wait for it to finish
backgroundThread.join()
}
} finally {
inMemoryStore.close()
if (levelDB != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we may want to guarantee this to be executed once regardless of exception being thrown in join().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, the background thread won't throw uncaught exceptions. So I think levelDB.close() is guaranteed to be executed. Here the try-catch block is trying to catch the IOException that might be thrown during levelDB.close().

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't join() able to throw InterruptedException? That's not a runtime exception but you're playing with Scala which ignores checked exception so you still need to be careful.

And IMHO, generally at any case we must ensure both level DB store and in memory store are closed because that's a resource leak.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendly reminder about the comment here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this comment. Yeah, you are right, join() can throw InterruptedException. I will update the code.

levelDB.close()
}
}
}

override def removeAllByIndexValues[T](
klass: Class[T],
index: String,
indexValues: Collection[_]): Boolean = {
if (backgroundThread != null) {
throw new IllegalStateException("removeAllByIndexValues() shouldn't be " +
"called after the hybrid store begins switching to levelDB")
}

getStore().removeAllByIndexValues(klass, index, indexValues)
}

def setLevelDB(levelDB: LevelDB): Unit = {
this.levelDB = levelDB
}

/**
* This method is called when the writing is done for inMemoryStore. A
* background thread will be created and be started to dump data in inMemoryStore
* to levelDB. Once the dumping is completed, the underlying kvstore will be
* switched to levelDB.
*/
def switchToLevelDB(
listener: HybridStore.SwitchToLevelDBListener,
appId: String,
attemptId: Option[String]): Unit = {
if (closed.get) {
return
}

backgroundThread = new Thread(() => {
try {
for (klass <- klassMap.keys().asScala) {
val it = inMemoryStore.view(klass).closeableIterator()
while (it.hasNext()) {
levelDB.write(it.next())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by comment - given I added something similar to an in-house patch.
Add a write(Iterator<E> values) to kv store - this should make this switch order(s) faster.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information, I will try adding a write(Iterator<E> values) in this pr.

Copy link
Contributor

@mridulm mridulm Jul 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this helps, this is what I had written up for level db - for memory store, the default list traversal + write is good enough :

  @Override
  public void write(List<?> values) throws Exception {
    Preconditions.checkArgument(values != null && !values.isEmpty(),
      "Non-empty values required.");

    // Group by class, in case there are values from different classes in the iterator
    // Typical usecase is for this to be a single class.
    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
            values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {

      final Iterator<?> valueIter = entry.getValue().iterator();
      final Iterator<byte[]> serializedValueIter;

      {
        // deserialize outside synchronized block
        List<byte[]> list = new ArrayList<>(entry.getValue().size());
        for (Object value : entry.getValue()) {
          list.add(serializer.serialize(value));
        }
        serializedValueIter = list.iterator();
      }

      final Class<?> valueClass = entry.getKey();
      final LevelDBTypeInfo ti = getTypeInfo(valueClass);

      // Batching updates per type
      synchronized (ti) {
        final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex();
        final Collection<LevelDBTypeInfo.Index> indices = ti.indices();

        try (WriteBatch batch = db().createWriteBatch()) {
          while (valueIter.hasNext()) {
            final Object value = valueIter.next();

            assert serializedValueIter.hasNext();
            final byte[] serializedObject = serializedValueIter.next();

            Object existing;
            try {
              existing = get(naturalIndex.entityKey(null, value), valueClass);
            } catch (NoSuchElementException e) {
              existing = null;
            }

            PrefixCache cache = new PrefixCache(value);
            byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
            for (LevelDBTypeInfo.Index idx : indices) {
              byte[] prefix = cache.getPrefix(idx);
              idx.add(batch, value, existing, serializedObject, naturalKey, prefix);
            }
          }
          assert !serializedValueIter.hasNext();
          db().write(batch);
        }
      }
    }
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is helpful!

Copy link
Author

@baohe-zhang baohe-zhang Jul 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm, I updated your code and used it on in-memory store - leveldb switching, but only saw little switching time improvement. I am not sure if somewhere wrong.

log size, jobs and tasks per job 2 jobs, 400 tasks per job 10 jobs, 400 tasks per job 50 jobs, 400 tasks per job 100 jobs, 400 tasks per job 200 jobs, 400 tasks per job 500 jobs, 400 tasks per job 1000 jobs, 400 tasks per job 5 jobs, 100000 tasks per job
original switching time 1s 2s 4s 8s 16s 37s 65s 90s
switching time with write(Iterator iter) 1s 1s 4s 7s 13s 34s 58s 84s

The code:

        for (klass <- klassMap.keys().asScala) {
          val it = inMemoryStore.view(klass).closeableIterator()
          levelDB.write(it)
        }
  public <T> void write(Iterator<T> iter) throws Exception {
    Preconditions.checkArgument(iter != null, "Non-empty values required.");

    List<T> values = new ArrayList<>();
    iter.forEachRemaining(values::add);

    // Group by class, in case there are values from different classes in the iterator
    // Typical usecase is for this to be a single class.
    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
            values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {

      final Iterator<?> valueIter = entry.getValue().iterator();
      final Iterator<byte[]> serializedValueIter;

      {
        // deserialize outside synchronized block
        List<byte[]> list = new ArrayList<>(entry.getValue().size());
        for (Object value : entry.getValue()) {
          list.add(serializer.serialize(value));
        }
        serializedValueIter = list.iterator();
      }

      final Class<?> valueClass = entry.getKey();
      final LevelDBTypeInfo ti = getTypeInfo(valueClass);

      // Batching updates per type
      synchronized (ti) {
        final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex();
        final Collection<LevelDBTypeInfo.Index> indices = ti.indices();

        try (WriteBatch batch = db().createWriteBatch()) {
          while (valueIter.hasNext()) {
            final Object value = valueIter.next();

            assert serializedValueIter.hasNext();
            final byte[] serializedObject = serializedValueIter.next();

            Object existing;
            try {
              existing = get(naturalIndex.entityKey(null, value), valueClass);
            } catch (NoSuchElementException e) {
              existing = null;
            }

            PrefixCache cache = new PrefixCache(value);
            byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
            for (LevelDBTypeInfo.Index idx : indices) {
              byte[] prefix = cache.getPrefix(idx);
              idx.add(batch, value, existing, serializedObject, naturalKey, prefix);
            }
          }
          assert !serializedValueIter.hasNext();
          db().write(batch);
        }
      }
    }
  }

I think using multiple threads to write data to leveldb might shorten the switching time but it would introduce more overhead to SHS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a function of how loaded your disk is, iops it can sustain, txn's leveldb can do concurrently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I am testing it on the mac which has SSD and the disk is not busy. I think the improvement might be more obvious on HDD or busy disk. @HeartSaVioR @tgravescs Do we need to add batch write support for leveldb on this pr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's mandatory. You can file another issue as "improvement" for this, but IMHO working with this is completely optional for you. I think we have already asked so many things to do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can improve that afterward.

}
listener.onSwitchToLevelDBSuccess()
shouldUseInMemoryStore.set(false)
inMemoryStore.close()
} catch {
case e: Exception =>
listener.onSwitchToLevelDBFail(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be moved to the catch statement.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
})
backgroundThread.setDaemon(true)
backgroundThread.setName(s"hybridstore-$appId-$attemptId")
backgroundThread.start()
}

/**
* This method return the store that we should use.
*/
private def getStore(): KVStore = {
if (shouldUseInMemoryStore.get) {
inMemoryStore
} else {
levelDB
}
}
}

private[history] object HybridStore {

trait SwitchToLevelDBListener {

def onSwitchToLevelDBSuccess(): Unit

def onSwitchToLevelDBFail(e: Exception): Unit
}
}
Loading