Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ private class HistoryServerMemoryManager(
conf: SparkConf) extends Logging {

private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
private val currentUsage = new AtomicLong(0L)
private val active = new HashMap[(String, Option[String]), Long]()
// Visible for testing.
private[history] val currentUsage = new AtomicLong(0L)
private[history] val active = new HashMap[(String, Option[String]), Long]()

def initialize(): Unit = {
logInfo("Initialized memory manager: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[history] class HybridStore extends KVStore {
private var backgroundThread: Thread = null

// A hash map that stores all classes that had been writen to inMemoryStore
private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
// Visible for testing
private[history] val klassMap = new ConcurrentHashMap[Class[_], Boolean]

override def getMetadata[T](klass: Class[T]): T = {
getStore().getMetadata(klass)
Expand Down Expand Up @@ -165,8 +166,9 @@ private[history] class HybridStore extends KVStore {

/**
* This method return the store that we should use.
* Visible for testing.
*/
private def getStore(): KVStore = {
private[history] def getStore(): KVStore = {
if (shouldUseInMemoryStore.get) {
inMemoryStore
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}

private def testAppLogParsing(inMemory: Boolean): Unit = {
test("SPARK-31608: parse application logs with HybridStore") {
testAppLogParsing(false, true)
}

private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = {
val clock = new ManualClock(12345678)
val conf = createTestConf(inMemory = inMemory)
val conf = createTestConf(inMemory = inMemory, useHybridStore = useHybridStore)
val provider = new FsHistoryProvider(conf, clock)

// Write a new-style application log.
Expand Down Expand Up @@ -1509,14 +1513,17 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
new FileOutputStream(file).close()
}

private def createTestConf(inMemory: Boolean = false): SparkConf = {
private def createTestConf(
inMemory: Boolean = false,
useHybridStore: Boolean = false): SparkConf = {
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
.set(FAST_IN_PROGRESS_PARSING, true)

if (!inMemory) {
conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
}
conf.set(HYBRID_STORE_ENABLED, useHybridStore)

conf
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.History._

class HistoryServerMemoryManagerSuite extends SparkFunSuite {

private val MAX_USAGE = 3L

test("lease and release memory") {
val conf = new SparkConf().set(MAX_IN_MEMORY_STORE_USAGE, MAX_USAGE)
val manager = new HistoryServerMemoryManager(conf)

// Memory usage estimation for non-compressed log file is filesize / 2
manager.lease("app1", None, 2L, None)
manager.lease("app2", None, 2L, None)
manager.lease("app3", None, 2L, None)
assert(manager.currentUsage.get === 3L)
assert(manager.active.size === 3)
assert(manager.active.get(("app1", None)) === Some(1L))

intercept[RuntimeException] {
manager.lease("app4", None, 2L, None)
}

// Releasing a non-existent app is a no-op
manager.release("app4", None)
assert(manager.currentUsage.get === 3L)

manager.release("app1", None)
assert(manager.currentUsage.get === 2L)
assert(manager.active.size === 2)

manager.lease("app4", None, 2L, None)
assert(manager.currentUsage.get === 3L)
assert(manager.active.size === 3)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.File
import java.util.NoSuchElementException
import java.util.concurrent.LinkedBlockingQueue

import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.status.KVUtils._
import org.apache.spark.util.kvstore._

class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits {

private var db: LevelDB = _
private var dbpath: File = _

before {
dbpath = File.createTempFile("test.", ".ldb")
dbpath.delete()
db = new LevelDB(dbpath, new KVStoreScalaSerializer())
}

after {
if (db != null) {
db.close()
}
if (dbpath != null) {
FileUtils.deleteQuietly(dbpath)
}
}

test("test multiple objects write read delete") {
val store = createHybridStore()

val t1 = createCustomType1(1)
val t2 = createCustomType1(2)

intercept[NoSuchElementException] {
store.read(t1.getClass(), t1.key)
}

store.write(t1)
store.write(t2)
store.delete(t2.getClass(), t2.key)

Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)

intercept[NoSuchElementException] {
store.read(t2.getClass(), t2.key)
}
assert(store.read(t1.getClass(), t1.key) === t1)
assert(store.count(t1.getClass()) === 1L)
}
}

test("test metadata") {
val store = createHybridStore()
assert(store.getMetadata(classOf[CustomType1]) === null)

val t1 = createCustomType1(1)
store.setMetadata(t1)
assert(store.getMetadata(classOf[CustomType1]) === t1)

// Switch to LevelDB and set a new metadata
switchHybridStore(store)

val t2 = createCustomType1(2)
store.setMetadata(t2)
assert(store.getMetadata(classOf[CustomType1]) === t2)
}

test("test update") {
val store = createHybridStore()
val t = createCustomType1(1)

store.write(t)
t.name = "name2"
store.write(t)

Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)

assert(store.count(t.getClass()) === 1L)
assert(store.read(t.getClass(), t.key) === t)
}
}

test("test basic iteration") {
val store = createHybridStore()

val t1 = createCustomType1(1)
store.write(t1)
val t2 = createCustomType1(2)
store.write(t2)

Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)

assert(store.view(t1.getClass()).iterator().next().id === t1.id)
assert(store.view(t1.getClass()).skip(1).iterator().next().id === t2.id)
assert(store.view(t1.getClass()).skip(1).max(1).iterator().next().id === t2.id)
assert(store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id === t1.id)
assert(store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id === t2.id)
}
}

test("test delete after switch") {
val store = createHybridStore()
val t = createCustomType1(1)
store.write(t)
switchHybridStore(store)
intercept[IllegalStateException] {
store.delete(t.getClass(), t.key)
}
}

test("test klassMap") {
val store = createHybridStore()
val t1 = createCustomType1(1)
store.write(t1)
assert(store.klassMap.size === 1)
val t2 = new CustomType2("key2")
store.write(t2)
assert(store.klassMap.size === 2)

switchHybridStore(store)
val t3 = new CustomType3("key3")
store.write(t3)
// Cannot put new klass to klassMap after the switching starts
assert(store.klassMap.size === 2)
}

private def createHybridStore(): HybridStore = {
val store = new HybridStore()
store.setLevelDB(db)
store
}

private def createCustomType1(i: Int): CustomType1 = {
new CustomType1("key" + i, "id" + i, "name" + i, i, "child" + i)
}

private def switchHybridStore(store: HybridStore): Unit = {
assert(store.getStore().isInstanceOf[InMemoryStore])
val listener = new SwitchListener()
store.switchToLevelDB(listener, "test", None)
failAfter(2.seconds) {
assert(listener.waitUntilDone())
}
while (!store.getStore().isInstanceOf[LevelDB]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we need to wait more after waiting listener? Or just to make sure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a potential race in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala#L154. So we need this while loop to ensure the shouldUseInMemoryStore is set to false.

Thread.sleep(10)
}
}

private class SwitchListener extends HybridStore.SwitchToLevelDBListener {

// Put true to the queue when switch succeeds, and false when fails.
private val results = new LinkedBlockingQueue[Boolean]()

override def onSwitchToLevelDBSuccess(): Unit = {
try {
results.put(true)
} catch {
case _: InterruptedException =>
// no-op
}
}

override def onSwitchToLevelDBFail(e: Exception): Unit = {
try {
results.put(false)
} catch {
case _: InterruptedException =>
// no-op
}
}

def waitUntilDone(): Boolean = {
results.take()
}
}
}

class CustomType1(
@KVIndexParam var key: String,
@KVIndexParam("id") var id: String,
@KVIndexParam(value = "name", copy = true) var name: String,
@KVIndexParam("int") var num: Int,
@KVIndexParam(value = "child", parent = "id") var child: String) {

override def equals(o: Any): Boolean = {
o match {
case t: CustomType1 =>
id.equals(t.id) && name.equals(t.name)
case _ => false
}
}

override def hashCode: Int = {
id.hashCode
}

override def toString: String = {
"CustomType1[key=" + key + ",id=" + id + ",name=" + name + ",num=" + num;
}
}

class CustomType2(@KVIndexParam var key: String) {}

class CustomType3(@KVIndexParam var key: String) {}