diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala index 7fc0722233854..00e58cbdc57b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala @@ -33,8 +33,9 @@ private class HistoryServerMemoryManager( conf: SparkConf) extends Logging { private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE) - private val currentUsage = new AtomicLong(0L) - private val active = new HashMap[(String, Option[String]), Long]() + // Visible for testing. + private[history] val currentUsage = new AtomicLong(0L) + private[history] val active = new HashMap[(String, Option[String]), Long]() def initialize(): Unit = { logInfo("Initialized memory manager: " + diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala index 08db2bd0766c3..58714f16e8417 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala @@ -54,7 +54,8 @@ private[history] class HybridStore extends KVStore { private var backgroundThread: Thread = null // A hash map that stores all classes that had been writen to inMemoryStore - private val klassMap = new ConcurrentHashMap[Class[_], Boolean] + // Visible for testing + private[history] val klassMap = new ConcurrentHashMap[Class[_], Boolean] override def getMetadata[T](klass: Class[T]): T = { getStore().getMetadata(klass) @@ -165,8 +166,9 @@ private[history] class HybridStore extends KVStore { /** * This method return the store that we should use. + * Visible for testing. */ - private def getStore(): KVStore = { + private[history] def getStore(): KVStore = { if (shouldUseInMemoryStore.get) { inMemoryStore } else { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 21a99a462aa1e..3f8c875f5a552 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -90,9 +90,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } - private def testAppLogParsing(inMemory: Boolean): Unit = { + test("SPARK-31608: parse application logs with HybridStore") { + testAppLogParsing(false, true) + } + + private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = { val clock = new ManualClock(12345678) - val conf = createTestConf(inMemory = inMemory) + val conf = createTestConf(inMemory = inMemory, useHybridStore = useHybridStore) val provider = new FsHistoryProvider(conf, clock) // Write a new-style application log. @@ -1509,7 +1513,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { new FileOutputStream(file).close() } - private def createTestConf(inMemory: Boolean = false): SparkConf = { + private def createTestConf( + inMemory: Boolean = false, + useHybridStore: Boolean = false): SparkConf = { val conf = new SparkConf() .set(HISTORY_LOG_DIR, testDir.getAbsolutePath()) .set(FAST_IN_PROGRESS_PARSING, true) @@ -1517,6 +1523,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { if (!inMemory) { conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) } + conf.set(HYBRID_STORE_ENABLED, useHybridStore) conf } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerMemoryManagerSuite.scala new file mode 100644 index 0000000000000..697f8f72624e5 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerMemoryManagerSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.History._ + +class HistoryServerMemoryManagerSuite extends SparkFunSuite { + + private val MAX_USAGE = 3L + + test("lease and release memory") { + val conf = new SparkConf().set(MAX_IN_MEMORY_STORE_USAGE, MAX_USAGE) + val manager = new HistoryServerMemoryManager(conf) + + // Memory usage estimation for non-compressed log file is filesize / 2 + manager.lease("app1", None, 2L, None) + manager.lease("app2", None, 2L, None) + manager.lease("app3", None, 2L, None) + assert(manager.currentUsage.get === 3L) + assert(manager.active.size === 3) + assert(manager.active.get(("app1", None)) === Some(1L)) + + intercept[RuntimeException] { + manager.lease("app4", None, 2L, None) + } + + // Releasing a non-existent app is a no-op + manager.release("app4", None) + assert(manager.currentUsage.get === 3L) + + manager.release("app1", None) + assert(manager.currentUsage.get === 2L) + assert(manager.active.size === 2) + + manager.lease("app4", None, 2L, None) + assert(manager.currentUsage.get === 3L) + assert(manager.active.size === 3) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala new file mode 100644 index 0000000000000..fa57049b1a770 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File +import java.util.NoSuchElementException +import java.util.concurrent.LinkedBlockingQueue + +import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.TimeLimits +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.status.KVUtils._ +import org.apache.spark.util.kvstore._ + +class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits { + + private var db: LevelDB = _ + private var dbpath: File = _ + + before { + dbpath = File.createTempFile("test.", ".ldb") + dbpath.delete() + db = new LevelDB(dbpath, new KVStoreScalaSerializer()) + } + + after { + if (db != null) { + db.close() + } + if (dbpath != null) { + FileUtils.deleteQuietly(dbpath) + } + } + + test("test multiple objects write read delete") { + val store = createHybridStore() + + val t1 = createCustomType1(1) + val t2 = createCustomType1(2) + + intercept[NoSuchElementException] { + store.read(t1.getClass(), t1.key) + } + + store.write(t1) + store.write(t2) + store.delete(t2.getClass(), t2.key) + + Seq(false, true).foreach { switch => + if (switch) switchHybridStore(store) + + intercept[NoSuchElementException] { + store.read(t2.getClass(), t2.key) + } + assert(store.read(t1.getClass(), t1.key) === t1) + assert(store.count(t1.getClass()) === 1L) + } + } + + test("test metadata") { + val store = createHybridStore() + assert(store.getMetadata(classOf[CustomType1]) === null) + + val t1 = createCustomType1(1) + store.setMetadata(t1) + assert(store.getMetadata(classOf[CustomType1]) === t1) + + // Switch to LevelDB and set a new metadata + switchHybridStore(store) + + val t2 = createCustomType1(2) + store.setMetadata(t2) + assert(store.getMetadata(classOf[CustomType1]) === t2) + } + + test("test update") { + val store = createHybridStore() + val t = createCustomType1(1) + + store.write(t) + t.name = "name2" + store.write(t) + + Seq(false, true).foreach { switch => + if (switch) switchHybridStore(store) + + assert(store.count(t.getClass()) === 1L) + assert(store.read(t.getClass(), t.key) === t) + } + } + + test("test basic iteration") { + val store = createHybridStore() + + val t1 = createCustomType1(1) + store.write(t1) + val t2 = createCustomType1(2) + store.write(t2) + + Seq(false, true).foreach { switch => + if (switch) switchHybridStore(store) + + assert(store.view(t1.getClass()).iterator().next().id === t1.id) + assert(store.view(t1.getClass()).skip(1).iterator().next().id === t2.id) + assert(store.view(t1.getClass()).skip(1).max(1).iterator().next().id === t2.id) + assert(store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id === t1.id) + assert(store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id === t2.id) + } + } + + test("test delete after switch") { + val store = createHybridStore() + val t = createCustomType1(1) + store.write(t) + switchHybridStore(store) + intercept[IllegalStateException] { + store.delete(t.getClass(), t.key) + } + } + + test("test klassMap") { + val store = createHybridStore() + val t1 = createCustomType1(1) + store.write(t1) + assert(store.klassMap.size === 1) + val t2 = new CustomType2("key2") + store.write(t2) + assert(store.klassMap.size === 2) + + switchHybridStore(store) + val t3 = new CustomType3("key3") + store.write(t3) + // Cannot put new klass to klassMap after the switching starts + assert(store.klassMap.size === 2) + } + + private def createHybridStore(): HybridStore = { + val store = new HybridStore() + store.setLevelDB(db) + store + } + + private def createCustomType1(i: Int): CustomType1 = { + new CustomType1("key" + i, "id" + i, "name" + i, i, "child" + i) + } + + private def switchHybridStore(store: HybridStore): Unit = { + assert(store.getStore().isInstanceOf[InMemoryStore]) + val listener = new SwitchListener() + store.switchToLevelDB(listener, "test", None) + failAfter(2.seconds) { + assert(listener.waitUntilDone()) + } + while (!store.getStore().isInstanceOf[LevelDB]) { + Thread.sleep(10) + } + } + + private class SwitchListener extends HybridStore.SwitchToLevelDBListener { + + // Put true to the queue when switch succeeds, and false when fails. + private val results = new LinkedBlockingQueue[Boolean]() + + override def onSwitchToLevelDBSuccess(): Unit = { + try { + results.put(true) + } catch { + case _: InterruptedException => + // no-op + } + } + + override def onSwitchToLevelDBFail(e: Exception): Unit = { + try { + results.put(false) + } catch { + case _: InterruptedException => + // no-op + } + } + + def waitUntilDone(): Boolean = { + results.take() + } + } +} + +class CustomType1( + @KVIndexParam var key: String, + @KVIndexParam("id") var id: String, + @KVIndexParam(value = "name", copy = true) var name: String, + @KVIndexParam("int") var num: Int, + @KVIndexParam(value = "child", parent = "id") var child: String) { + + override def equals(o: Any): Boolean = { + o match { + case t: CustomType1 => + id.equals(t.id) && name.equals(t.name) + case _ => false + } + } + + override def hashCode: Int = { + id.hashCode + } + + override def toString: String = { + "CustomType1[key=" + key + ",id=" + id + ",name=" + name + ",num=" + num; + } +} + +class CustomType2(@KVIndexParam var key: String) {} + +class CustomType3(@KVIndexParam var key: String) {}