From 504481618946b899ee751667b8bb00305a976bb9 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Thu, 15 Oct 2015 17:36:29 +0530 Subject: [PATCH 01/12] Added SnappyShuffleMemoryManager, SnappyBlockManager, SnappyMemoryStore classes that check GemXD critical up events --- .../shuffle/SnappyShuffleMemoryManager.scala | 46 +++++++++++++++++++ .../spark/storage/SnappyBlockManager.scala | 37 +++++++++++++++ .../spark/storage/SnappyMemoryStore.scala | 40 ++++++++++++++++ 3 files changed, 123 insertions(+) create mode 100644 snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala create mode 100644 snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala create mode 100644 snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala diff --git a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala new file mode 100644 index 0000000000..b325aad1b7 --- /dev/null +++ b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -0,0 +1,46 @@ +package org.apache.spark.shuffle + +import scala.reflect.runtime.{universe => ru} + +/** + * Created by shirishd on 15/10/15. + */ + +private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory: Long, + override val pageSizeBytes: Long) extends ShuffleMemoryManager(maxMemory, pageSizeBytes) { + + override def tryToAcquire(numBytes: Long): Long = synchronized { + val taskAttemptId = currentTaskAttemptId() + assert(numBytes > 0, "invalid number of bytes requested: " + numBytes) + + // Add this task to the taskMemory map just so we can keep an accurate count of the number + // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire + if (!taskMemory.contains(taskAttemptId)) { + taskMemory(taskAttemptId) = 0L + notifyAll() // Will later cause waiting tasks to wake up and check numThreads again + } + + if (isCriticalUp) { + 0 + } else { + super.tryToAcquire(numBytes) + } + } + + def isCriticalUp(): Boolean = { + val companionObject = "org.apache.spark.storage.SnappyMemoryStore" + val mirror = ru.runtimeMirror(getClass.getClassLoader) + val moduleSymbol = mirror.staticModule(companionObject) + val moduleMirror = mirror.reflectModule(moduleSymbol) + val instanceMirror = mirror.reflect(moduleMirror.instance) + val method = instanceMirror.reflectMethod(instanceMirror.symbol.toType.declaration(ru.newTermName("isCriticalUp")).asMethod) + val ret = method() + ret.asInstanceOf[Boolean] + +// val className = Class.forName("org.apache.spark.storage.SnappyMemoryStore$") //note the $ +// className.getField("MODULE$").get(null).asInstanceOf[Boolean] + } + + +} + diff --git a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala new file mode 100644 index 0000000000..b76a445106 --- /dev/null +++ b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -0,0 +1,37 @@ +package org.apache.spark.storage + +import org.apache.spark.network.{BlockDataManager, BlockTransferService} +import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.{SecurityManager, MapOutputTracker, SparkConf} +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ + +/** + * Created by shirishd on 12/10/15. + */ + +private[spark] class SnappyBlockManager( + executorId: String, + rpcEnv: RpcEnv, + override val master: BlockManagerMaster, + defaultSerializer: Serializer, + override val conf: SparkConf, + mapOutputTracker: MapOutputTracker, + shuffleManager: ShuffleManager, + blockTransferService: BlockTransferService, + securityManager: SecurityManager, + numUsableCores: Int) + extends BlockManager(executorId, rpcEnv, master, defaultSerializer, conf, mapOutputTracker, + shuffleManager, blockTransferService, securityManager, numUsableCores) { + + + override protected[spark] val memoryStore = Class.forName("org.apache.spark.storage.SnappyMemoryStore"). + //getConstructors()(0).newInstance(this, maxMemory: java.lang.Long). + getConstructors()(0).newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long). + asInstanceOf[MemoryStore] +} + +object SnappyBlockManager { + +} \ No newline at end of file diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala new file mode 100644 index 0000000000..70f8d3ef07 --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -0,0 +1,40 @@ +package org.apache.spark.storage + +import scala.collection.mutable.ArrayBuffer + +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore + + +/** + * Created by shirishd on 9/10/15. + */ +private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Long) + extends MemoryStore(blockManager, maxMemory) { + + override def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long): ResultWithDroppedBlocks = { + + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + + println(s"GemFireStore.getBootedInstance = " + GemFireStore.getBootedInstance) + + if (SnappyMemoryStore.isCriticalUp()) { + println(s"Will not store $blockIdToAdd as CRITICAL UP event is received") + logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is received") + return ResultWithDroppedBlocks(success = false, droppedBlocks) + } + super.ensureFreeSpace(blockIdToAdd, space) + } + +} + +object SnappyMemoryStore { + + def isCriticalUp(): Boolean = { + GemFireStore.getBootedInstance != null && GemFireStore.getBootedInstance.thresholdListener.isCritical + } + +} + + From b5cc9dfc0aef9418eb673b7759a7d6904203c1a3 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Tue, 20 Oct 2015 10:21:21 +0530 Subject: [PATCH 02/12] Added a dunit test --- .../dunit/SnappyCriticalUpDUnitTest.scala | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala new file mode 100644 index 0000000000..bdd88848e2 --- /dev/null +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala @@ -0,0 +1,140 @@ +package io.snappydata.dunit + +import java.util.Properties + +import scala.Predef._ + +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl +import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager} +import com.pivotal.gemfirexd.internal.engine.Misc +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore +import dunit.{DistributedTestBase, Host} +import io.snappydata.ServiceManager + +/** + * Created by shirishd on 19/10/15. + */ +class SnappyCriticalUpDUnitTest (s: String) extends DistributedTestBase(s) { + + val host = Host.getHost(0); + + val vm0 = host.getVM(0); + val vm1 = host.getVM(1); + val vm2 = host.getVM(2); + val vm3 = host.getVM(3); + + override + def setUp(): Unit = { + //super.setUp() + } + + override + def tearDown2(): Unit = { + } + + def testCriticalUp(): Unit = { + val locatorNetPort = 9999 //AvailablePortHelper.getRandomAvailableTCPPort + val serverNetPort = 8888 //AvailablePortHelper.getRandomAvailableTCPPort + val peerDiscoveryPort = 7777 //AvailablePortHelper.getRandomAvailableTCPPort + + val locatorArgs = new Array[AnyRef](3) + locatorArgs(0) = "localhost" + locatorArgs(1) = new Integer(locatorNetPort) + locatorArgs(2) = new Integer(peerDiscoveryPort) + + vm1.invoke(this.getClass, "startLocator", locatorArgs) + + val driverArgs = new Array[AnyRef](1) + //val locStr = InetAddress.getLocalHost.getHostAddress+"["+peerDiscoveryPort+"]" + val locStr = "localhost[" + peerDiscoveryPort + "]" + driverArgs(0) = locStr + + vm2.invoke(this.getClass, "startDriverApp", driverArgs) + } + +} + +object SnappyCriticalUpDUnitTest { + + def helloWorld(): Unit = { + hello("Hello World! " + this.getClass); + } + + def hello(s: String): Unit = { + println(s); + } + + def startLocator(bindAddress: String, netport: Int, peerDiscoveryPort: Int): Unit = { + val locatorService = ServiceManager.getLocatorInstance + val bootProps = new Properties() + bootProps.setProperty("persist-dd", "false") + locatorService.start("localhost", peerDiscoveryPort, bootProps) + locatorService.startNetworkServer("localhost", netport, bootProps) + println("Loc vm type = " + GemFireStore.getBootedInstance.getMyVMKind) + println("locator prop in loc = " + InternalDistributedSystem.getConnectedInstance.getConfig.getLocators) + } + + def startDriverApp(locatorStr: String): Unit = { + startSnappyLocalModeAndTestCiritcalUp(locatorStr) + val dsys = InternalDistributedSystem.getConnectedInstance + assert(dsys != null) + println("Driver vm type = " + GemFireStore.getBootedInstance.getMyVMKind) + println("locator prop in driver app = " + InternalDistributedSystem.getConnectedInstance.getConfig.getLocators) + } + + private def startSnappyLocalModeAndTestCiritcalUp(locStr: String): Unit = { + val setMaster: String = "local[6]" + + val conf = new org.apache.spark.SparkConf().setAppName("SnappyCriticalUpDUnitTest") + .set("spark.logConf", "true") + + if (setMaster != null) { + conf.setMaster(setMaster) + } + + // Set the url from the locator + val snappydataurl = "jdbc:snappydata:;locators=" + locStr + ";persist-dd=false;" + conf.set("gemfirexd.db.url", snappydataurl) + conf.set("gemfirexd.db.driver", "com.pivotal.gemfirexd.jdbc.EmbeddedDriver") + + + val sc = new org.apache.spark.SparkContext(conf) + val snContext = org.apache.spark.sql.SnappyContext(sc) + snContext.sql("set spark.sql.shuffle.partitions=6") + + val props = Map( + "url" -> snappydataurl, + "poolImpl" -> "tomcat", + "driver" -> "com.pivotal.gemfirexd.jdbc.EmbeddedDriver", + "user" -> "app", + "password" -> "app" + ) + + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() + rdd1.count + assert(!sc.getRDDStorageInfo.isEmpty) + rdd1.unpersist(true) + assert(sc.getRDDStorageInfo.isEmpty) + + + raiseMemoryEvent(true) + val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() + rdd2.count + assert(sc.getRDDStorageInfo.isEmpty) + println(sc.getRDDStorageInfo.length) + } + + private def raiseMemoryEvent(criticalUp: Boolean): Unit = { + println("About to raise CRITICAL UP event") + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) + resMgr.setCriticalHeapPercentage(90F) + + resMgr.getHeapMonitor().updateStateAndSendEvent(92); + println("CRITICAL UP event sent") + + } +} From d0c8697df4e2ce5c0d02f380dfde00b42d13b703 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Wed, 21 Oct 2015 12:20:03 +0530 Subject: [PATCH 03/12] Minor changes --- .../scala/org/apache/spark/storage/SnappyBlockManager.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala index b76a445106..438a0a8807 100644 --- a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -27,11 +27,6 @@ private[spark] class SnappyBlockManager( override protected[spark] val memoryStore = Class.forName("org.apache.spark.storage.SnappyMemoryStore"). - //getConstructors()(0).newInstance(this, maxMemory: java.lang.Long). getConstructors()(0).newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long). asInstanceOf[MemoryStore] -} - -object SnappyBlockManager { - } \ No newline at end of file From f991f8295e4cb0393ee4f7aa94438a6ba8f33e51 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Wed, 21 Oct 2015 12:39:33 +0530 Subject: [PATCH 04/12] Resolving conflicts in rebase --- .../shuffle/SnappyShuffleMemoryManager.scala | 4 +- .../dunit/SnappyCriticalUpDUnitTest.scala | 140 ------------------ .../cluster/SnappyCriticalUpDUnitTest.scala | 73 +++++++++ .../cluster/ExecutorInitiator.scala | 6 +- .../spark/storage/SnappyMemoryStore.scala | 3 - 5 files changed, 77 insertions(+), 149 deletions(-) delete mode 100644 snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala create mode 100644 snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala diff --git a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala index b325aad1b7..d0eda434bb 100644 --- a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -21,6 +21,7 @@ private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory } if (isCriticalUp) { + logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is received") 0 } else { super.tryToAcquire(numBytes) @@ -36,9 +37,6 @@ private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory val method = instanceMirror.reflectMethod(instanceMirror.symbol.toType.declaration(ru.newTermName("isCriticalUp")).asMethod) val ret = method() ret.asInstanceOf[Boolean] - -// val className = Class.forName("org.apache.spark.storage.SnappyMemoryStore$") //note the $ -// className.getField("MODULE$").get(null).asInstanceOf[Boolean] } diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala deleted file mode 100644 index bdd88848e2..0000000000 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/SnappyCriticalUpDUnitTest.scala +++ /dev/null @@ -1,140 +0,0 @@ -package io.snappydata.dunit - -import java.util.Properties - -import scala.Predef._ - -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl -import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager} -import com.pivotal.gemfirexd.internal.engine.Misc -import com.pivotal.gemfirexd.internal.engine.store.GemFireStore -import dunit.{DistributedTestBase, Host} -import io.snappydata.ServiceManager - -/** - * Created by shirishd on 19/10/15. - */ -class SnappyCriticalUpDUnitTest (s: String) extends DistributedTestBase(s) { - - val host = Host.getHost(0); - - val vm0 = host.getVM(0); - val vm1 = host.getVM(1); - val vm2 = host.getVM(2); - val vm3 = host.getVM(3); - - override - def setUp(): Unit = { - //super.setUp() - } - - override - def tearDown2(): Unit = { - } - - def testCriticalUp(): Unit = { - val locatorNetPort = 9999 //AvailablePortHelper.getRandomAvailableTCPPort - val serverNetPort = 8888 //AvailablePortHelper.getRandomAvailableTCPPort - val peerDiscoveryPort = 7777 //AvailablePortHelper.getRandomAvailableTCPPort - - val locatorArgs = new Array[AnyRef](3) - locatorArgs(0) = "localhost" - locatorArgs(1) = new Integer(locatorNetPort) - locatorArgs(2) = new Integer(peerDiscoveryPort) - - vm1.invoke(this.getClass, "startLocator", locatorArgs) - - val driverArgs = new Array[AnyRef](1) - //val locStr = InetAddress.getLocalHost.getHostAddress+"["+peerDiscoveryPort+"]" - val locStr = "localhost[" + peerDiscoveryPort + "]" - driverArgs(0) = locStr - - vm2.invoke(this.getClass, "startDriverApp", driverArgs) - } - -} - -object SnappyCriticalUpDUnitTest { - - def helloWorld(): Unit = { - hello("Hello World! " + this.getClass); - } - - def hello(s: String): Unit = { - println(s); - } - - def startLocator(bindAddress: String, netport: Int, peerDiscoveryPort: Int): Unit = { - val locatorService = ServiceManager.getLocatorInstance - val bootProps = new Properties() - bootProps.setProperty("persist-dd", "false") - locatorService.start("localhost", peerDiscoveryPort, bootProps) - locatorService.startNetworkServer("localhost", netport, bootProps) - println("Loc vm type = " + GemFireStore.getBootedInstance.getMyVMKind) - println("locator prop in loc = " + InternalDistributedSystem.getConnectedInstance.getConfig.getLocators) - } - - def startDriverApp(locatorStr: String): Unit = { - startSnappyLocalModeAndTestCiritcalUp(locatorStr) - val dsys = InternalDistributedSystem.getConnectedInstance - assert(dsys != null) - println("Driver vm type = " + GemFireStore.getBootedInstance.getMyVMKind) - println("locator prop in driver app = " + InternalDistributedSystem.getConnectedInstance.getConfig.getLocators) - } - - private def startSnappyLocalModeAndTestCiritcalUp(locStr: String): Unit = { - val setMaster: String = "local[6]" - - val conf = new org.apache.spark.SparkConf().setAppName("SnappyCriticalUpDUnitTest") - .set("spark.logConf", "true") - - if (setMaster != null) { - conf.setMaster(setMaster) - } - - // Set the url from the locator - val snappydataurl = "jdbc:snappydata:;locators=" + locStr + ";persist-dd=false;" - conf.set("gemfirexd.db.url", snappydataurl) - conf.set("gemfirexd.db.driver", "com.pivotal.gemfirexd.jdbc.EmbeddedDriver") - - - val sc = new org.apache.spark.SparkContext(conf) - val snContext = org.apache.spark.sql.SnappyContext(sc) - snContext.sql("set spark.sql.shuffle.partitions=6") - - val props = Map( - "url" -> snappydataurl, - "poolImpl" -> "tomcat", - "driver" -> "com.pivotal.gemfirexd.jdbc.EmbeddedDriver", - "user" -> "app", - "password" -> "app" - ) - - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() - rdd1.count - assert(!sc.getRDDStorageInfo.isEmpty) - rdd1.unpersist(true) - assert(sc.getRDDStorageInfo.isEmpty) - - - raiseMemoryEvent(true) - val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() - rdd2.count - assert(sc.getRDDStorageInfo.isEmpty) - println(sc.getRDDStorageInfo.length) - } - - private def raiseMemoryEvent(criticalUp: Boolean): Unit = { - println("About to raise CRITICAL UP event") - val gfCache: GemFireCacheImpl = Misc.getGemFireCache - val resMgr: InternalResourceManager = gfCache.getResourceManager - resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) - HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) - resMgr.setCriticalHeapPercentage(90F) - - resMgr.getHeapMonitor().updateStateAndSendEvent(92); - println("CRITICAL UP event sent") - - } -} diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala new file mode 100644 index 0000000000..16b30b2590 --- /dev/null +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala @@ -0,0 +1,73 @@ +package io.snappydata.dunit.cluster + +import scala.Predef._ + +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl +import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager} +import com.pivotal.gemfirexd.internal.engine.Misc + +import org.apache.spark.SparkEnv + +/** + * Created by shirishd on 19/10/15. + */ +class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { + + def testCriticalUp(): Unit = { + // Lead is started before other servers are started. + vm0.invoke(this.getClass, "startSnappyLead") + vm1.invoke(this.getClass, "startSnappyServer") + vm2.invoke(this.getClass, "startSnappyServer") + + // Execute the job + vm0.invoke(this.getClass, "runSparkJob") + vm1.invoke(this.getClass, "raiseMemoryEvent") + vm2.invoke(this.getClass, "raiseMemoryEvent") + vm0.invoke(this.getClass, "runSparkJobWithCriticalUp") + + vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") + vm2.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") + + vm0.invoke(this.getClass, "stopSnappyLead") + vm1.invoke(this.getClass, "stopSnappyServer") + vm2.invoke(this.getClass, "stopSnappyServer") + } + +} + +object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { + + def runSparkJob(): Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() + rdd1.count + assert(!sc.getRDDStorageInfo.isEmpty) + rdd1.unpersist(true) + assert(sc.getRDDStorageInfo.isEmpty) + } + + def runSparkJobWithCriticalUp(): Unit = { + //val snContext = org.apache.spark.sql.SnappyContext(sc) + //raiseMemoryEvent(true) + val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() + rdd2.count + assert(sc.getRDDStorageInfo.isEmpty) + println(sc.getRDDStorageInfo.length) + } + + def assertShuffleMemoryManagerBehavior(): Unit = { + assert(SparkEnv.get.shuffleMemoryManager.tryToAcquire(1000) == 0) + } + + def raiseMemoryEvent(): Unit = { + println("About to raise CRITICAL UP event") + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) + resMgr.setCriticalHeapPercentage(90F) + + resMgr.getHeapMonitor().updateStateAndSendEvent(92); + println("CRITICAL UP event sent") + + } +} \ No newline at end of file diff --git a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala index d455672415..6662cb3aa0 100644 --- a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala +++ b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala @@ -115,9 +115,9 @@ object ExecutorInitiator extends Logging { } } } - - //TODO: Hemant: add executor specific properties from local conf to - //TODO: this conf that was received from driver. + //TODO: Hemant: add executor specific properties from local conf to + //TODO: this conf that was received from driver. + driverConf.set("spark.snappydata.enabled", "true") //TODO: Hemant: get the number of cores from spark conf diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala index 70f8d3ef07..6eff957e6e 100644 --- a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -17,10 +17,7 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - println(s"GemFireStore.getBootedInstance = " + GemFireStore.getBootedInstance) - if (SnappyMemoryStore.isCriticalUp()) { - println(s"Will not store $blockIdToAdd as CRITICAL UP event is received") logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is received") return ResultWithDroppedBlocks(success = false, droppedBlocks) } From 54ce8e591d8be6e8719d170ee90e5cda14649745 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Tue, 27 Oct 2015 16:01:14 +0530 Subject: [PATCH 05/12] Added a check for eviction up event --- .../shuffle/SnappyShuffleMemoryManager.scala | 2 +- .../apache/spark/storage/SnappyMemoryStore.scala | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala index d0eda434bb..636d1edd50 100644 --- a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -21,7 +21,7 @@ private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory } if (isCriticalUp) { - logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is received") + logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is detected") 0 } else { super.tryToAcquire(numBytes) diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala index 6eff957e6e..3aa5b7e718 100644 --- a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -11,6 +11,15 @@ import com.pivotal.gemfirexd.internal.engine.store.GemFireStore private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Long) extends MemoryStore(blockManager, maxMemory) { + override def freeMemory: Long = { + if(SnappyMemoryStore.isEvictionUp()) { + logInfo(s"Snappy-store EVICTION UP event detected") + 0 + } else { + super.freeMemory + } + } + override def ensureFreeSpace( blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { @@ -18,12 +27,11 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] if (SnappyMemoryStore.isCriticalUp()) { - logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is received") + logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is detected") return ResultWithDroppedBlocks(success = false, droppedBlocks) } super.ensureFreeSpace(blockIdToAdd, space) } - } object SnappyMemoryStore { @@ -32,6 +40,10 @@ object SnappyMemoryStore { GemFireStore.getBootedInstance != null && GemFireStore.getBootedInstance.thresholdListener.isCritical } + def isEvictionUp(): Boolean = { + GemFireStore.getBootedInstance != null && GemFireStore.getBootedInstance.thresholdListener.isEviction + } + } From f2337b2b8a43ef0da570c3d3c5b24329b9f56353 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Thu, 29 Oct 2015 17:29:00 +0530 Subject: [PATCH 06/12] Incorporate review comments --- .../shuffle/SnappyShuffleMemoryManager.scala | 15 ++++-------- .../spark/storage/SnappyBlockManager.scala | 11 +++++---- .../cluster/SnappyCriticalUpDUnitTest.scala | 9 ++++--- .../cluster/ExecutorInitiator.scala | 6 ++++- .../spark/storage/SnappyMemoryStore.scala | 15 ++---------- .../spark/storage/SnappyMemoryUtils.scala | 24 +++++++++++++++++++ 6 files changed, 46 insertions(+), 34 deletions(-) create mode 100644 snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala diff --git a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala index 636d1edd50..b3140201fb 100644 --- a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -1,6 +1,6 @@ package org.apache.spark.shuffle -import scala.reflect.runtime.{universe => ru} +import org.apache.spark.util.Utils /** * Created by shirishd on 15/10/15. @@ -28,17 +28,10 @@ private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory } } - def isCriticalUp(): Boolean = { - val companionObject = "org.apache.spark.storage.SnappyMemoryStore" - val mirror = ru.runtimeMirror(getClass.getClassLoader) - val moduleSymbol = mirror.staticModule(companionObject) - val moduleMirror = mirror.reflectModule(moduleSymbol) - val instanceMirror = mirror.reflect(moduleMirror.instance) - val method = instanceMirror.reflectMethod(instanceMirror.symbol.toType.declaration(ru.newTermName("isCriticalUp")).asMethod) - val ret = method() - ret.asInstanceOf[Boolean] + def isCriticalUp: Boolean = { + Utils.getContextOrSparkClassLoader.loadClass("org.apache.spark.storage.SnappyMemoryUtils"). + getMethod("isCriticalUp").invoke(null).asInstanceOf[Boolean] } - } diff --git a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala index 438a0a8807..bab2aa0682 100644 --- a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -2,6 +2,7 @@ package org.apache.spark.storage import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.util.Utils import org.apache.spark.{SecurityManager, MapOutputTracker, SparkConf} import org.apache.spark.rpc.RpcEnv import org.apache.spark.serializer.Serializer @@ -25,8 +26,10 @@ private[spark] class SnappyBlockManager( extends BlockManager(executorId, rpcEnv, master, defaultSerializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) { + val SNAPPY_MEMORYSTORE = "org.apache.spark.storage.SnappyMemoryStore" + + override protected[spark] val memoryStore = Utils.getContextOrSparkClassLoader. + loadClass(SNAPPY_MEMORYSTORE).getConstructor(classOf[BlockManager], + classOf[Long]).newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long).asInstanceOf[MemoryStore] +} - override protected[spark] val memoryStore = Class.forName("org.apache.spark.storage.SnappyMemoryStore"). - getConstructors()(0).newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long). - asInstanceOf[MemoryStore] -} \ No newline at end of file diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala index 16b30b2590..dd61c258a4 100644 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala @@ -14,9 +14,8 @@ import org.apache.spark.SparkEnv class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { def testCriticalUp(): Unit = { - // Lead is started before other servers are started. - vm0.invoke(this.getClass, "startSnappyLead") vm1.invoke(this.getClass, "startSnappyServer") + vm0.invoke(this.getClass, "startSnappyLead") vm2.invoke(this.getClass, "startSnappyServer") // Execute the job @@ -25,12 +24,14 @@ class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { vm2.invoke(this.getClass, "raiseMemoryEvent") vm0.invoke(this.getClass, "runSparkJobWithCriticalUp") + vm1.invoke(this.getClass, "raiseMemoryEvent") + vm2.invoke(this.getClass, "raiseMemoryEvent") vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") vm2.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") vm0.invoke(this.getClass, "stopSnappyLead") - vm1.invoke(this.getClass, "stopSnappyServer") vm2.invoke(this.getClass, "stopSnappyServer") + vm1.invoke(this.getClass, "stopSnappyServer") } } @@ -46,8 +47,6 @@ object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { } def runSparkJobWithCriticalUp(): Unit = { - //val snContext = org.apache.spark.sql.SnappyContext(sc) - //raiseMemoryEvent(true) val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() rdd2.count assert(sc.getRDDStorageInfo.isEmpty) diff --git a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala index 6662cb3aa0..d39467396e 100644 --- a/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala +++ b/snappy-tools/src/main/scala/io/snappydata/cluster/ExecutorInitiator.scala @@ -27,6 +27,9 @@ import org.apache.spark.{Logging, SparkCallbacks, SparkConf, SparkEnv} */ object ExecutorInitiator extends Logging { + val SNAPPY_BLOCKMANAGER = "org.apache.spark.storage.SnappyBlockManager" + val SNAPPY_SHUFFLEMEMORYMANAGER = "org.apache.spark.shuffle.SnappyShuffleMemoryManager" + var executorRunnable: ExecutorRunnable = new ExecutorRunnable var executorThread: Thread = new Thread(executorRunnable) @@ -117,7 +120,8 @@ object ExecutorInitiator extends Logging { } //TODO: Hemant: add executor specific properties from local conf to //TODO: this conf that was received from driver. - driverConf.set("spark.snappydata.enabled", "true") + driverConf.set("spark.blockManager", SNAPPY_BLOCKMANAGER) + driverConf.set("spark.shuffleMemoryManager", SNAPPY_SHUFFLEMEMORYMANAGER) //TODO: Hemant: get the number of cores from spark conf diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala index 3aa5b7e718..d92d1a1c0b 100644 --- a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -12,7 +12,7 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo extends MemoryStore(blockManager, maxMemory) { override def freeMemory: Long = { - if(SnappyMemoryStore.isEvictionUp()) { + if(SnappyMemoryUtils.isEvictionUp) { logInfo(s"Snappy-store EVICTION UP event detected") 0 } else { @@ -26,7 +26,7 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - if (SnappyMemoryStore.isCriticalUp()) { + if (SnappyMemoryUtils.isCriticalUp) { logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is detected") return ResultWithDroppedBlocks(success = false, droppedBlocks) } @@ -34,16 +34,5 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo } } -object SnappyMemoryStore { - - def isCriticalUp(): Boolean = { - GemFireStore.getBootedInstance != null && GemFireStore.getBootedInstance.thresholdListener.isCritical - } - - def isEvictionUp(): Boolean = { - GemFireStore.getBootedInstance != null && GemFireStore.getBootedInstance.thresholdListener.isEviction - } - -} diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala new file mode 100644 index 0000000000..ed7d092aac --- /dev/null +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala @@ -0,0 +1,24 @@ +package org.apache.spark.storage + +import com.pivotal.gemfirexd.internal.engine.store.GemFireStore + +/** + * Created by shirishd on 29/10/15. + */ +object SnappyMemoryUtils { + /** + * Checks whether GemFire critical threshold is breached + * @return + */ + def isCriticalUp: Boolean = { + Option(GemFireStore.getBootedInstance).exists(g => g.thresholdListener.isCritical) + } + + /** + * Checks whether GemFire eviction threshold is breached + * @return + */ + def isEvictionUp: Boolean = { + Option(GemFireStore.getBootedInstance).exists(g => g.thresholdListener.isEviction) + } +} From c9a37441f17c51dd68819981634df5d69743a762 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Fri, 30 Oct 2015 19:34:51 +0530 Subject: [PATCH 07/12] Test change --- .../cluster/SnappyCriticalUpDUnitTest.scala | 55 +++++++++++++++++-- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala index dd61c258a4..50bea291e8 100644 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala @@ -7,25 +7,24 @@ import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalR import com.pivotal.gemfirexd.internal.engine.Misc import org.apache.spark.SparkEnv +import org.apache.spark.storage.StorageLevel /** * Created by shirishd on 19/10/15. */ class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { - def testCriticalUp(): Unit = { + def __testCriticalUp(): Unit = { vm1.invoke(this.getClass, "startSnappyServer") vm0.invoke(this.getClass, "startSnappyLead") vm2.invoke(this.getClass, "startSnappyServer") // Execute the job vm0.invoke(this.getClass, "runSparkJob") - vm1.invoke(this.getClass, "raiseMemoryEvent") - vm2.invoke(this.getClass, "raiseMemoryEvent") + vm1.invoke(this.getClass, "raiseCriticalUpMemoryEvent") + vm2.invoke(this.getClass, "raiseCriticalUpMemoryEvent") vm0.invoke(this.getClass, "runSparkJobWithCriticalUp") - vm1.invoke(this.getClass, "raiseMemoryEvent") - vm2.invoke(this.getClass, "raiseMemoryEvent") vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") vm2.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") @@ -34,6 +33,20 @@ class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { vm1.invoke(this.getClass, "stopSnappyServer") } + def testEvictionUp(): Unit = { + vm1.invoke(this.getClass, "startSnappyServer") + vm0.invoke(this.getClass, "startSnappyLead") + + // Execute the job + vm0.invoke(this.getClass, "runSparkJob2") + vm1.invoke(this.getClass, "raiseEvictionUpMemoryEvent") + vm0.invoke(this.getClass, "runSparkJobWithEvictionUp") + + vm0.invoke(this.getClass, "stopSnappyLead") + vm1.invoke(this.getClass, "stopSnappyServer") + + } + } object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { @@ -57,7 +70,26 @@ object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { assert(SparkEnv.get.shuffleMemoryManager.tryToAcquire(1000) == 0) } - def raiseMemoryEvent(): Unit = { + def runSparkJob2(): Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).persist(StorageLevel.MEMORY_ONLY) + rdd1.count + assert(!sc.getRDDStorageInfo.isEmpty) + println("rdd1 " + rdd1.id) + } + + def runSparkJobWithEvictionUp(): Unit = { + val rdd2 = sc.makeRDD(Array(1)).persist(StorageLevel.MEMORY_ONLY) + rdd2.count + assert(!sc.getRDDStorageInfo.isEmpty) + assert(sc.getRDDStorageInfo.size == 1) + + val rdd3 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).persist(StorageLevel.MEMORY_ONLY) + rdd3.count + assert(!sc.getRDDStorageInfo.isEmpty) + assert(sc.getRDDStorageInfo.size == 1) + } + + def raiseCriticalUpMemoryEvent(): Unit = { println("About to raise CRITICAL UP event") val gfCache: GemFireCacheImpl = Misc.getGemFireCache val resMgr: InternalResourceManager = gfCache.getResourceManager @@ -67,6 +99,17 @@ object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { resMgr.getHeapMonitor().updateStateAndSendEvent(92); println("CRITICAL UP event sent") + } + + def raiseEvictionUpMemoryEvent(): Unit = { + println("About to raise EVICTION UP event") + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) + resMgr.setEvictionHeapPercentage(60) + resMgr.getHeapMonitor().updateStateAndSendEvent(70); + println("EVICTION UP event sent") } } \ No newline at end of file From de37af0b21653e9b84a3ba0c111fa80d011246a3 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Sat, 31 Oct 2015 15:13:29 +0530 Subject: [PATCH 08/12] Test change --- .../io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala index 50bea291e8..b9437896d8 100644 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala @@ -14,7 +14,7 @@ import org.apache.spark.storage.StorageLevel */ class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { - def __testCriticalUp(): Unit = { + def testCriticalUp(): Unit = { vm1.invoke(this.getClass, "startSnappyServer") vm0.invoke(this.getClass, "startSnappyLead") vm2.invoke(this.getClass, "startSnappyServer") From ddfa14444a596427c448867c45b6ea583fc5da5f Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Sat, 31 Oct 2015 16:01:01 +0530 Subject: [PATCH 09/12] Renamed a test file --- ...pDUnitTest.scala => SnappyResourceEventsDUnitTest.scala} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/{SnappyCriticalUpDUnitTest.scala => SnappyResourceEventsDUnitTest.scala} (95%) diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala similarity index 95% rename from snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala rename to snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala index b9437896d8..a80bd3ae67 100644 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyCriticalUpDUnitTest.scala +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala @@ -12,9 +12,9 @@ import org.apache.spark.storage.StorageLevel /** * Created by shirishd on 19/10/15. */ -class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { +class SnappyResourceEventsDUnitTest (s: String) extends ClusterManagerTestBase(s) { - def testCriticalUp(): Unit = { + def __testCriticalUp(): Unit = { vm1.invoke(this.getClass, "startSnappyServer") vm0.invoke(this.getClass, "startSnappyLead") vm2.invoke(this.getClass, "startSnappyServer") @@ -49,7 +49,7 @@ class SnappyCriticalUpDUnitTest (s: String) extends ClusterManagerTestBase(s) { } -object SnappyCriticalUpDUnitTest extends ClusterManagerTestUtils { +object SnappyResourceEventsDUnitTest extends ClusterManagerTestUtils { def runSparkJob(): Unit = { val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() From 0f05355cdaf1bc1fc285f2e9959f0d316545aed3 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Wed, 4 Nov 2015 12:20:36 +0530 Subject: [PATCH 10/12] Test change --- .../SnappyResourceEventsDUnitTest.scala | 111 ++++++++++-------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala index a80bd3ae67..8502b22537 100644 --- a/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala +++ b/snappy-dunits/src/test/scala/io/snappydata/dunit/cluster/SnappyResourceEventsDUnitTest.scala @@ -5,96 +5,96 @@ import scala.Predef._ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl import com.gemstone.gemfire.internal.cache.control.{HeapMemoryMonitor, InternalResourceManager} import com.pivotal.gemfirexd.internal.engine.Misc +import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils +import dunit.DistributedTestBase +import io.snappydata.ServiceManager import org.apache.spark.SparkEnv -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{RDDInfo, StorageLevel} /** * Created by shirishd on 19/10/15. */ class SnappyResourceEventsDUnitTest (s: String) extends ClusterManagerTestBase(s) { - def __testCriticalUp(): Unit = { - vm1.invoke(this.getClass, "startSnappyServer") - vm0.invoke(this.getClass, "startSnappyLead") - vm2.invoke(this.getClass, "startSnappyServer") + override def tearDown2(): Unit = { + Array(vm3, vm2, vm1, vm0).foreach(_.invoke(this.getClass, "resetGFResourceManager")) + super.tearDown2() + } + + def testCriticalUp(): Unit = { + vm1.invoke(this.getClass, "startSnappyServer", startArgs) + vm0.invoke(this.getClass, "startSnappyLead", startArgs) // Execute the job vm0.invoke(this.getClass, "runSparkJob") vm1.invoke(this.getClass, "raiseCriticalUpMemoryEvent") - vm2.invoke(this.getClass, "raiseCriticalUpMemoryEvent") - vm0.invoke(this.getClass, "runSparkJobWithCriticalUp") + vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach") vm1.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") - vm2.invoke(this.getClass, "assertShuffleMemoryManagerBehavior") - - vm0.invoke(this.getClass, "stopSnappyLead") - vm2.invoke(this.getClass, "stopSnappyServer") - vm1.invoke(this.getClass, "stopSnappyServer") } def testEvictionUp(): Unit = { - vm1.invoke(this.getClass, "startSnappyServer") - vm0.invoke(this.getClass, "startSnappyLead") + vm1.invoke(this.getClass, "startSnappyServer", startArgs) + vm0.invoke(this.getClass, "startSnappyLead", startArgs) // Execute the job - vm0.invoke(this.getClass, "runSparkJob2") + vm0.invoke(this.getClass, "runSparkJob") vm1.invoke(this.getClass, "raiseEvictionUpMemoryEvent") - vm0.invoke(this.getClass, "runSparkJobWithEvictionUp") - - vm0.invoke(this.getClass, "stopSnappyLead") - vm1.invoke(this.getClass, "stopSnappyServer") + vm0.invoke(this.getClass, "runSparkJobAfterThresholdBreach") } - } object SnappyResourceEventsDUnitTest extends ClusterManagerTestUtils { def runSparkJob(): Unit = { - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() - rdd1.count + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd1.count) assert(!sc.getRDDStorageInfo.isEmpty) - rdd1.unpersist(true) - assert(sc.getRDDStorageInfo.isEmpty) - } - - def runSparkJobWithCriticalUp(): Unit = { - val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).cache() - rdd2.count - assert(sc.getRDDStorageInfo.isEmpty) - println(sc.getRDDStorageInfo.length) } def assertShuffleMemoryManagerBehavior(): Unit = { assert(SparkEnv.get.shuffleMemoryManager.tryToAcquire(1000) == 0) } - def runSparkJob2(): Unit = { - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).persist(StorageLevel.MEMORY_ONLY) - rdd1.count - assert(!sc.getRDDStorageInfo.isEmpty) - println("rdd1 " + rdd1.id) + def getInMemorySizeForCachedRDDs: Long = { + val rddInfo: Array[RDDInfo] = sc.getRDDStorageInfo + var sum = 0L; + for (i <- 0 to rddInfo.length - 1) { + sum = sum + rddInfo(i).memSize + } + sum } - def runSparkJobWithEvictionUp(): Unit = { - val rdd2 = sc.makeRDD(Array(1)).persist(StorageLevel.MEMORY_ONLY) - rdd2.count - assert(!sc.getRDDStorageInfo.isEmpty) - assert(sc.getRDDStorageInfo.size == 1) - - val rdd3 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8)).persist(StorageLevel.MEMORY_ONLY) - rdd3.count - assert(!sc.getRDDStorageInfo.isEmpty) - assert(sc.getRDDStorageInfo.size == 1) + def runSparkJobAfterThresholdBreach(): Unit = { + val sum1: Long = getInMemorySizeForCachedRDDs + println("1. cached rdd mem size before caching rdd when critical or eviction up = " + sum1) + + val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd2.count) + val sum2: Long = getInMemorySizeForCachedRDDs + println("2. cached rdd mem size after caching first rdd when critical or eviction up = " + sum2) + // make sure that after eviction up new rdd being cached does not result in + // increased memory usage + assert(!(sum2 > sum1)) + + val rdd3 = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)).cache() + println(rdd3.count) + val sum3: Long = getInMemorySizeForCachedRDDs + println("3. cached rdd mem size after caching second rdd when critical or eviction up = " + sum3) + // make sure that after eviction up new rdd being cached does not result in + // increased memory usage + assert(!(sum3 > sum2)) } - + def raiseCriticalUpMemoryEvent(): Unit = { println("About to raise CRITICAL UP event") val gfCache: GemFireCacheImpl = Misc.getGemFireCache val resMgr: InternalResourceManager = gfCache.getResourceManager + HeapMemoryMonitor.setTestDisableMemoryUpdates(true) resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) - HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) + HeapMemoryMonitor.setTestBytesUsedForThresholdSet(92) resMgr.setCriticalHeapPercentage(90F) resMgr.getHeapMonitor().updateStateAndSendEvent(92); @@ -105,11 +105,22 @@ object SnappyResourceEventsDUnitTest extends ClusterManagerTestUtils { println("About to raise EVICTION UP event") val gfCache: GemFireCacheImpl = Misc.getGemFireCache val resMgr: InternalResourceManager = gfCache.getResourceManager + HeapMemoryMonitor.setTestDisableMemoryUpdates(true) resMgr.getHeapMonitor.setTestMaxMemoryBytes(100) HeapMemoryMonitor.setTestBytesUsedForThresholdSet(90) - resMgr.setEvictionHeapPercentage(60) - resMgr.getHeapMonitor().updateStateAndSendEvent(70); + resMgr.setEvictionHeapPercentage(40F) + resMgr.getHeapMonitor().updateStateAndSendEvent(85); println("EVICTION UP event sent") } + + def resetGFResourceManager(): Unit = { + val service = ServiceManager.currentFabricServiceInstance + if (service != null) { + val gfCache: GemFireCacheImpl = Misc.getGemFireCache + val resMgr: InternalResourceManager = gfCache.getResourceManager + resMgr.getHeapMonitor.setTestMaxMemoryBytes(0) + resMgr.getHeapMonitor.updateStateAndSendEvent(10) + } + } } \ No newline at end of file From 3a246b2951840d79ce9fac0ddd7e67bfcdad80d3 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Wed, 4 Nov 2015 14:31:27 +0530 Subject: [PATCH 11/12] Using Utils.classForName --- .../scala/org/apache/spark/storage/SnappyBlockManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala index bab2aa0682..9515eb8668 100644 --- a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala +++ b/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -28,8 +28,8 @@ private[spark] class SnappyBlockManager( val SNAPPY_MEMORYSTORE = "org.apache.spark.storage.SnappyMemoryStore" - override protected[spark] val memoryStore = Utils.getContextOrSparkClassLoader. - loadClass(SNAPPY_MEMORYSTORE).getConstructor(classOf[BlockManager], - classOf[Long]).newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long).asInstanceOf[MemoryStore] + override private[spark] val memoryStore = Utils.classForName(SNAPPY_MEMORYSTORE). + getConstructor(classOf[BlockManager], classOf[Long]). + newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long).asInstanceOf[MemoryStore] } From 0d50b900cc0f12ee5234d1e064ca6110d46b6769 Mon Sep 17 00:00:00 2001 From: Shirish Deshmukh Date: Tue, 10 Nov 2015 12:20:14 +0530 Subject: [PATCH 12/12] Incorporate review comments --- .../spark/shuffle/SnappyShuffleMemoryManager.scala | 10 ++-------- .../apache/spark/storage/SnappyBlockManager.scala | 14 ++++---------- .../apache/spark/storage/SnappyMemoryStore.scala | 5 +---- .../apache/spark/storage/SnappyMemoryUtils.scala | 4 ++-- 4 files changed, 9 insertions(+), 24 deletions(-) rename {snappy-core => snappy-tools}/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala (78%) rename {snappy-core => snappy-tools}/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala (59%) diff --git a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala b/snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala similarity index 78% rename from snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala rename to snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala index b3140201fb..0c47d4f123 100644 --- a/snappy-core/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/shuffle/SnappyShuffleMemoryManager.scala @@ -1,6 +1,6 @@ package org.apache.spark.shuffle -import org.apache.spark.util.Utils +import org.apache.spark.storage.SnappyMemoryUtils /** * Created by shirishd on 15/10/15. @@ -20,18 +20,12 @@ private[spark] class SnappyShuffleMemoryManager protected(override val maxMemory notifyAll() // Will later cause waiting tasks to wake up and check numThreads again } - if (isCriticalUp) { + if (SnappyMemoryUtils.isCriticalUp) { logInfo(s"Will not store $numBytes bytes as CRITICAL UP event is detected") 0 } else { super.tryToAcquire(numBytes) } } - - def isCriticalUp: Boolean = { - Utils.getContextOrSparkClassLoader.loadClass("org.apache.spark.storage.SnappyMemoryUtils"). - getMethod("isCriticalUp").invoke(null).asInstanceOf[Boolean] - } - } diff --git a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala similarity index 59% rename from snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala rename to snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala index 9515eb8668..862a12ec9f 100644 --- a/snappy-core/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyBlockManager.scala @@ -1,12 +1,10 @@ package org.apache.spark.storage -import org.apache.spark.network.{BlockDataManager, BlockTransferService} -import org.apache.spark.shuffle.ShuffleManager -import org.apache.spark.util.Utils -import org.apache.spark.{SecurityManager, MapOutputTracker, SparkConf} +import org.apache.spark.network.BlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.serializer.Serializer -import org.apache.spark.storage._ +import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.{MapOutputTracker, SecurityManager, SparkConf} /** * Created by shirishd on 12/10/15. @@ -26,10 +24,6 @@ private[spark] class SnappyBlockManager( extends BlockManager(executorId, rpcEnv, master, defaultSerializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) { - val SNAPPY_MEMORYSTORE = "org.apache.spark.storage.SnappyMemoryStore" - - override private[spark] val memoryStore = Utils.classForName(SNAPPY_MEMORYSTORE). - getConstructor(classOf[BlockManager], classOf[Long]). - newInstance(this, BlockManager.getMaxMemory(conf): java.lang.Long).asInstanceOf[MemoryStore] + override private[spark] val memoryStore = new SnappyMemoryStore(this, BlockManager.getMaxMemory(conf)) } diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala index d92d1a1c0b..ae0d85d1dd 100644 --- a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryStore.scala @@ -2,8 +2,6 @@ package org.apache.spark.storage import scala.collection.mutable.ArrayBuffer -import com.pivotal.gemfirexd.internal.engine.store.GemFireStore - /** * Created by shirishd on 9/10/15. @@ -24,10 +22,9 @@ private[spark] class SnappyMemoryStore(blockManager: BlockManager, maxMemory: Lo blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - if (SnappyMemoryUtils.isCriticalUp) { logInfo(s"Will not store $blockIdToAdd as CRITICAL UP event is detected") + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] return ResultWithDroppedBlocks(success = false, droppedBlocks) } super.ensureFreeSpace(blockIdToAdd, space) diff --git a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala index ed7d092aac..6cbe186278 100644 --- a/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala +++ b/snappy-tools/src/main/scala/org/apache/spark/storage/SnappyMemoryUtils.scala @@ -11,7 +11,7 @@ object SnappyMemoryUtils { * @return */ def isCriticalUp: Boolean = { - Option(GemFireStore.getBootedInstance).exists(g => g.thresholdListener.isCritical) + Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isCritical) } /** @@ -19,6 +19,6 @@ object SnappyMemoryUtils { * @return */ def isEvictionUp: Boolean = { - Option(GemFireStore.getBootedInstance).exists(g => g.thresholdListener.isEviction) + Option(GemFireStore.getBootingInstance).exists(g => g.thresholdListener.isEviction) } }