Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io.snappydata.dunit.cluster

import scala.Predef._

import com.gemstone.gemfire.internal.cache.GemFireCacheImpl
import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager}
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
import dunit.DistributedTestBase
import io.snappydata.ServiceManager

import org.apache.spark.SparkEnv
import org.apache.spark.storage.{RDDInfo, StorageLevel}

/**
* Created by shirishd on 19/10/15.
*/
class SnappyResourceEventsDUnitTest (s: String) extends ClusterManagerTestBase(s) {

override def tearDown2(): Unit = {
Array(vm3, vm2, vm1, vm0).foreach(_.invoke(this.getClass, "resetGFResourceManager"))
super.tearDown2()
}

def testCriticalUp(): Unit = {
vm1.invoke(this.getClass, "startSnappyServer", startArgs)
vm0.invoke(this.getClass, "startSnappyLead", startArgs)

// Execute the job
vm0.invoke(this.getClass, "runSparkJob")
vm1.invoke(this.getClass, "raiseCriticalUpMemoryEvent")
vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach")

vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior")
}

def testEvictionUp(): Unit = {
vm1.invoke(this.getClass, "startSnappyServer", startArgs)
vm0.invoke(this.getClass, "startSnappyLead", startArgs)

// Execute the job
vm0.invoke(this.getClass, "runSparkJob")
vm1.invoke(this.getClass, "raiseEvictionUpMemoryEvent")
vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach")

}
}

object SnappyResourceEventsDUnitTest extends ClusterManagerTestUtils {

def runSparkJob(): Unit = {
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache()
println(rdd1.count)
assert(!sc.getRDDStorageInfo.isEmpty)
}

def assertShuffleMemoryManagerBehavior(): Unit = {
assert(SparkEnv.get.shuffleMemoryManager.tryToAcquire(1000) == 0)
}

def getInMemorySizeForCachedRDDs: Long = {
val rddInfo: Array[RDDInfo] = sc.getRDDStorageInfo
var sum = 0L;
for (i <- 0 to rddInfo.length - 1) {
sum = sum + rddInfo(i).memSize
}
sum
}

def runSparkJobAfterThresholdBreach(): Unit = {
val sum1: Long = getInMemorySizeForCachedRDDs
println("1. cached rdd mem size before caching rdd when critical or eviction up = " + sum1)

val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache()
println(rdd2.count)
val sum2: Long = getInMemorySizeForCachedRDDs
println("2. cached rdd mem size after caching first rdd when critical or eviction up = " + sum2)
// make sure that after eviction up new rdd being cached does not result in
// increased memory usage
assert(!(sum2 > sum1))

val rdd3 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache()
println(rdd3.count)
val sum3: Long = getInMemorySizeForCachedRDDs
println("3. cached rdd mem size after caching second rdd when critical or eviction up = " + sum3)
// make sure that after eviction up new rdd being cached does not result in
// increased memory usage
assert(!(sum3 > sum2))
}

def raiseCriticalUpMemoryEvent(): Unit = {
println("About to raise CRITICAL UP event")
val gfCache: GemFireCacheImpl = Misc.getGemFireCache
val resMgr: InternalResourceManager = gfCache.getResourceManager
HeapMemoryMonitor.setTestDisableMemoryUpdates(true)
resMgr.getHeapMonitor.setTestMaxMemoryBytes(100)
HeapMemoryMonitor.setTestBytesUsedForThresholdSet(92)
resMgr.setCriticalHeapPercentage(90F)

resMgr.getHeapMonitor().updateStateAndSendEvent(92);
println("CRITICAL UP event sent")
}

def raiseEvictionUpMemoryEvent(): Unit = {
println("About to raise EVICTION UP event")
val gfCache: GemFireCacheImpl = Misc.getGemFireCache
val resMgr: InternalResourceManager = gfCache.getResourceManager
HeapMemoryMonitor.setTestDisableMemoryUpdates(true)
resMgr.getHeapMonitor.setTestMaxMemoryBytes(100)
HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90)
resMgr.setEvictionHeapPercentage(40F)
resMgr.getHeapMonitor().updateStateAndSendEvent(85);
println("EVICTION UP event sent")

}

def resetGFResourceManager(): Unit = {
val service = ServiceManager.currentFabricServiceInstance
if (service != null) {
val gfCache: GemFireCacheImpl = Misc.getGemFireCache
val resMgr: InternalResourceManager = gfCache.getResourceManager
resMgr.getHeapMonitor.setTestMaxMemoryBytes(0)
resMgr.getHeapMonitor.updateStateAndSendEvent(10)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.apache.spark.{Logging, SparkCallbacks, SparkConf, SparkEnv}
*/
object ExecutorInitiator extends Logging {

val SNAPPY_BLOCKMANAGER = "org.apache.spark.storage.SnappyBlockManager"
val SNAPPY_SHUFFLEMEMORYMANAGER = "org.apache.spark.shuffle.SnappyShuffleMemoryManager"

var executorRunnable: ExecutorRunnable = new ExecutorRunnable

var executorThread: Thread = new Thread(executorRunnable)
Expand Down Expand Up @@ -115,9 +118,10 @@ object ExecutorInitiator extends Logging {
}
}
}

//TODO: Hemant: add executor specific properties from local conf to
//TODO: this conf that was received from driver.
//TODO: Hemant: add executor specific properties from local conf to
//TODO: this conf that was received from driver.
driverConf.set("spark.blockManager", SNAPPY_BLOCKMANAGER)
driverConf.set("spark.shuffleMemoryManager", SNAPPY_SHUFFLEMEMORYMANAGER)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with this approach going forward could be that it will work only with SnappyCM and not with Yarn (when we add full support for that). Perhaps a better approach might be to introduce a callback somewhere in base ShuffleMemoryManager.tryAcquire that can be plugged in whenever driver joins the GemXD cluster. Not critical to correct for 0.2 milestone but I think should track in Jira.

//TODO: Hemant: get the number of cores from spark conf

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.spark.shuffle

import org.apache.spark.storage.SnappyMemoryUtils

/**
* Created by shirishd on 15/10/15.
*/

private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory: Long,
override val pageSizeBytes: Long) extends ShuffleMemoryManager(maxMemory, pageSizeBytes) {

override def tryToAcquire(numBytes: Long): Long = synchronized {
val taskAttemptId = currentTaskAttemptId()
assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)

// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
if (!taskMemory.contains(taskAttemptId)) {
taskMemory(taskAttemptId) = 0L
notifyAll() // Will later cause waiting tasks to wake up and check numThreads again
}

if (SnappyMemoryUtils.isCriticalUp) {
logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is detected")
0
} else {
super.tryToAcquire(numBytes)
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.spark.storage

import org.apache.spark.network.BlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.{MapOutputTracker, SecurityManager, SparkConf}

/**
* Created by shirishd on 12/10/15.
*/

private[spark] class SnappyBlockManager(
executorId: String,
rpcEnv: RpcEnv,
override val master: BlockManagerMaster,
defaultSerializer: Serializer,
override val conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockManager(executorId, rpcEnv, master, defaultSerializer, conf, mapOutputTracker,
shuffleManager, blockTransferService, securityManager, numUsableCores) {

override private[spark] val memoryStore = new SnappyMemoryStore(this, BlockManager.getMaxMemory(conf))
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.spark.storage

import scala.collection.mutable.ArrayBuffer


/**
* Created by shirishd on 9/10/15.
*/
private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Long)
extends MemoryStore(blockManager, maxMemory) {

override def freeMemory: Long = {
if(SnappyMemoryUtils.isEvictionUp) {
logInfo(s"Snappy-store EVICTION UP event detected")
0
} else {
super.freeMemory
}
}

override def ensureFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {

if (SnappyMemoryUtils.isCriticalUp) {
logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is detected")
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
super.ensureFreeSpace(blockIdToAdd, space)
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.storage

import com.pivotal.gemfirexd.internal.engine.store.GemFireStore

/**
* Created by shirishd on 29/10/15.
*/
object SnappyMemoryUtils {
/**
* Checks whether GemFire critical threshold is breached
* @return
*/
def isCriticalUp: Boolean = {
Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isCritical)
}

/**
* Checks whether GemFire eviction threshold is breached
* @return
*/
def isEvictionUp: Boolean = {
Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isEviction)
}
}