diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala new file mode 100644 index 0000000000..8502b22537 --- /dev/null +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala @@ -0,0 +1,126 @@ +package io.snappydata.dunit.cluster + +import scala.Predef._ + +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl +import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager} +import com.pivotal.gemfirexd.internal.engine.Misc +import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils +import dunit.DistributedTestBase +import io.snappydata.ServiceManager + +import org.apache.spark.SparkEnv +import org.apache.spark.storage.{RDDInfo, StorageLevel} + +/** + * Created by shirishd on 19/10/15. + */ +class SnappyResourceEventsDUnitTest (s: String) extends ClusterManagerTestBase(s) { + + override def tearDown2(): Unit = { + Array(vm3, vm2, vm1, vm0).foreach(_.invoke(this.getClass, "resetGFResourceManager")) + super.tearDown2() + } + + def testCriticalUp(): Unit = { + vm1.invoke(this.getClass, "startSnappyServer", startArgs) + vm0.invoke(this.getClass, "startSnappyLead", startArgs) + + // Execute the job + vm0.invoke(this.getClass, "runSparkJob") + vm1.invoke(this.getClass, "raiseCriticalUpMemoryEvent") + vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach") + + vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") + } + + def testEvictionUp(): Unit = { + vm1.invoke(this.getClass, "startSnappyServer", startArgs) + vm0.invoke(this.getClass, "startSnappyLead", startArgs) + + // Execute the job + vm0.invoke(this.getClass, "runSparkJob") + vm1.invoke(this.getClass, "raiseEvictionUpMemoryEvent") + vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach") + + } +} + +object SnappyResourceEventsDUnitTest extends ClusterManagerTestUtils { + + def runSparkJob(): Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd1.count) + assert(!sc.getRDDStorageInfo.isEmpty) + } + + def assertShuffleMemoryManagerBehavior(): Unit = { + assert(SparkEnv.get.shuffleMemoryManager.tryToAcquire(1000) == 0) + } + + def getInMemorySizeForCachedRDDs: Long = { + val rddInfo: Array[RDDInfo] = sc.getRDDStorageInfo + var sum = 0L; + for (i <- 0 to rddInfo.length - 1) { + sum = sum + rddInfo(i).memSize + } + sum + } + + def runSparkJobAfterThresholdBreach(): Unit = { + val sum1: Long = getInMemorySizeForCachedRDDs + println("1. cached rdd mem size before caching rdd when critical or eviction up = " + sum1) + + val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd2.count) + val sum2: Long = getInMemorySizeForCachedRDDs + println("2. cached rdd mem size after caching first rdd when critical or eviction up = " + sum2) + // make sure that after eviction up new rdd being cached does not result in + // increased memory usage + assert(!(sum2 > sum1)) + + val rdd3 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd3.count) + val sum3: Long = getInMemorySizeForCachedRDDs + println("3. cached rdd mem size after caching second rdd when critical or eviction up = " + sum3) + // make sure that after eviction up new rdd being cached does not result in + // increased memory usage + assert(!(sum3 > sum2)) + } + + def raiseCriticalUpMemoryEvent(): Unit = { + println("About to raise CRITICAL UP event") + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + HeapMemoryMonitor.setTestDisableMemoryUpdates(true) + resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(92) + resMgr.setCriticalHeapPercentage(90F) + + resMgr.getHeapMonitor().updateStateAndSendEvent(92); + println("CRITICAL UP event sent") + } + + def raiseEvictionUpMemoryEvent(): Unit = { + println("About to raise EVICTION UP event") + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + HeapMemoryMonitor.setTestDisableMemoryUpdates(true) + resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) + resMgr.setEvictionHeapPercentage(40F) + resMgr.getHeapMonitor().updateStateAndSendEvent(85); + println("EVICTION UP event sent") + + } + + def resetGFResourceManager(): Unit = { + val service = ServiceManager.currentFabricServiceInstance + if (service != null) { + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + resMgr.getHeapMonitor.setTestMaxMemoryBytes(0) + resMgr.getHeapMonitor.updateStateAndSendEvent(10) + } + } +} \ No newline at end of file diff --git a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala index d455672415..d39467396e 100644 --- a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala +++ b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala @@ -27,6 +27,9 @@ import org.apache.spark.{Logging, SparkCallbacks, SparkConf, SparkEnv} */ object ExecutorInitiator extends Logging { + val SNAPPY_BLOCKMANAGER = "org.apache.spark.storage.SnappyBlockManager" + val SNAPPY_SHUFFLEMEMORYMANAGER = "org.apache.spark.shuffle.SnappyShuffleMemoryManager" + var executorRunnable: ExecutorRunnable = new ExecutorRunnable var executorThread: Thread = new Thread(executorRunnable) @@ -115,9 +118,10 @@ object ExecutorInitiator extends Logging { } } } - - //TODO: Hemant: add executor specific properties from local conf to - //TODO: this conf that was received from driver. + //TODO: Hemant: add executor specific properties from local conf to + //TODO: this conf that was received from driver. + driverConf.set("spark.blockManager", SNAPPY_BLOCKMANAGER) + driverConf.set("spark.shuffleMemoryManager", SNAPPY_SHUFFLEMEMORYMANAGER) //TODO: Hemant: get the number of cores from spark conf diff --git a/snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala new file mode 100644 index 0000000000..0c47d4f123 --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -0,0 +1,31 @@ +package org.apache.spark.shuffle + +import org.apache.spark.storage.SnappyMemoryUtils + +/** + * Created by shirishd on 15/10/15. + */ + +private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory: Long, + override val pageSizeBytes: Long) extends ShuffleMemoryManager(maxMemory, pageSizeBytes) { + + override def tryToAcquire(numBytes: Long): Long = synchronized { + val taskAttemptId = currentTaskAttemptId() + assert(numBytes > 0, "invalid number of bytes requested: " + numBytes) + + // Add this task to the taskMemory map just so we can keep an accurate count of the number + // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire + if (!taskMemory.contains(taskAttemptId)) { + taskMemory(taskAttemptId) = 0L + notifyAll() // Will later cause waiting tasks to wake up and check numThreads again + } + + if (SnappyMemoryUtils.isCriticalUp) { + logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is detected") + 0 + } else { + super.tryToAcquire(numBytes) + } + } +} + diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala new file mode 100644 index 0000000000..862a12ec9f --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -0,0 +1,29 @@ +package org.apache.spark.storage + +import org.apache.spark.network.BlockTransferService +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.{MapOutputTracker, SecurityManager, SparkConf} + +/** + * Created by shirishd on 12/10/15. + */ + +private[spark] class SnappyBlockManager( + executorId: String, + rpcEnv: RpcEnv, + override val master: BlockManagerMaster, + defaultSerializer: Serializer, + override val conf: SparkConf, + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager, + blockTransferService: BlockTransferService, + securityManager: SecurityManager, + numUsableCores: Int) + extends BlockManager(executorId, rpcEnv, master, defaultSerializer, conf, mapOutputTracker, + shuffleManager, blockTransferService, securityManager, numUsableCores) { + + override private[spark] val memoryStore = new SnappyMemoryStore(this, BlockManager.getMaxMemory(conf)) +} + diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala new file mode 100644 index 0000000000..ae0d85d1dd --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -0,0 +1,35 @@ +package org.apache.spark.storage + +import scala.collection.mutable.ArrayBuffer + + +/** + * Created by shirishd on 9/10/15. + */ +private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Long) + extends MemoryStore(blockManager, maxMemory) { + + override def freeMemory: Long = { + if(SnappyMemoryUtils.isEvictionUp) { + logInfo(s"Snappy-store EVICTION UP event detected") + 0 + } else { + super.freeMemory + } + } + + override def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long): ResultWithDroppedBlocks = { + + if (SnappyMemoryUtils.isCriticalUp) { + logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is detected") + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + return ResultWithDroppedBlocks(success = false, droppedBlocks) + } + super.ensureFreeSpace(blockIdToAdd, space) + } +} + + + diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala new file mode 100644 index 0000000000..6cbe186278 --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala @@ -0,0 +1,24 @@ +package org.apache.spark.storage + +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore + +/** + * Created by shirishd on 29/10/15. + */ +object SnappyMemoryUtils { + /** + * Checks whether GemFire critical threshold is breached + * @return + */ + def isCriticalUp: Boolean = { + Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isCritical) + } + + /** + * Checks whether GemFire eviction threshold is breached + * @return + */ + def isEvictionUp: Boolean = { + Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isEviction) + } +}