diff --git a/core/src/main/scala/org/apache/spark/ColumnPartitionData.scala b/core/src/main/scala/org/apache/spark/ColumnPartitionData.scala index 5c53f0a4fe..31d78969fb 100644 --- a/core/src/main/scala/org/apache/spark/ColumnPartitionData.scala +++ b/core/src/main/scala/org/apache/spark/ColumnPartitionData.scala @@ -85,7 +85,7 @@ class ColumnPartitionData[T]( // Extracted to a function for use in deserialization private def initialize { pointers = schema.columns.map { col => - SparkEnv.get.heapMemoryAllocator.allocatePinnedMemory(col.columnType.bytes * size) + SparkEnv.get.heapMemoryAllocator.allocateMemory(col.columnType.bytes * size) } refCounter = 1 @@ -96,7 +96,7 @@ class ColumnPartitionData[T]( if (blobs == null) { blobs = new Array(1) } - val ptr = SparkEnv.get.heapMemoryAllocator.allocatePinnedMemory(blobSize) + val ptr = SparkEnv.get.heapMemoryAllocator.allocateMemory(blobSize) blobs(0) = ptr if (blobBuffers == null) { blobBuffers = new Array(1) @@ -148,9 +148,9 @@ class ColumnPartitionData[T]( assert(refCounter > 0) refCounter -= 1 if (refCounter == 0) { - pointers.foreach(SparkEnv.get.heapMemoryAllocator.freePinnedMemory(_)) + pointers.foreach(SparkEnv.get.heapMemoryAllocator.freeMemory(_)) if (blobs != null) { - blobs.foreach(SparkEnv.get.heapMemoryAllocator.freePinnedMemory(_)) + blobs.foreach(SparkEnv.get.heapMemoryAllocator.freeMemory(_)) } freeGPUPointers() } @@ -608,7 +608,7 @@ class ColumnPartitionData[T]( while (i < blobBuffersSize) { val blobSize = in.readLong() var blobOffset: Long = 8 - val ptr = SparkEnv.get.heapMemoryAllocator.allocatePinnedMemory(blobSize) + val ptr = SparkEnv.get.heapMemoryAllocator.allocateMemory(blobSize) blobs(i) = ptr val byteBuffer = ptr.getByteBuffer(0, blobSize).order(ByteOrder.LITTLE_ENDIAN) blobBuffers(i) = byteBuffer diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3e525a572f..a736ec1ce7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -203,8 +203,16 @@ abstract class RDD[T: ClassTag]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() - def cacheGpu() : RDD[T] = { sc.env.gpuMemoryManager.cacheGPUSlaves(id); this } - def unCacheGpu() : RDD[T] = { sc.env.gpuMemoryManager.unCacheGPUSlaves(id); this } + def cacheGpu() : RDD[T] = { + sc.env.gpuMemoryManager.cacheGPUSlaves(id); + storageGpuLevel = StorageLevel.MEMORY_ONLY + this + } + def unCacheGpu() : RDD[T] = { + sc.env.gpuMemoryManager.unCacheGPUSlaves(id); + storageGpuLevel = StorageLevel.NONE + this + } /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. @@ -294,6 +302,8 @@ abstract class RDD[T: ClassTag]( final def partitionData(split: Partition, context: TaskContext): PartitionData[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) + } else if (storageGpuLevel != StorageLevel.NONE) { + SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageGpuLevel) } else { computeOrReadCheckpoint(split, context) } @@ -1717,6 +1727,7 @@ abstract class RDD[T: ClassTag]( // ======================================================================= private var storageLevel: StorageLevel = StorageLevel.NONE + private var storageGpuLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = sc.getCallSite() diff --git a/core/src/test/scala/org/apache/spark/cuda/CUDAFunctionSuite.scala b/core/src/test/scala/org/apache/spark/cuda/CUDAFunctionSuite.scala index db490314a5..29f043e081 100644 --- a/core/src/test/scala/org/apache/spark/cuda/CUDAFunctionSuite.scala +++ b/core/src/test/scala/org/apache/spark/cuda/CUDAFunctionSuite.scala @@ -954,17 +954,22 @@ BB_RET: Some((size: Long) => 1), Some(dimensions))) - def generateData: Array[DataPoint] = { + def generateData(seed: Int, N: Int, D: Int, R: Double): DataPoint = { + val r = new Random(seed) def generatePoint(i: Int): DataPoint = { val y = if (i % 2 == 0) -1 else 1 - val x = Array.fill(D){rand.nextGaussian + y * R} + val x = Array.fill(D){r.nextGaussian + y * R} DataPoint(x, y) } - Array.tabulate(N)(generatePoint) + generatePoint(seed) } - val pointsCached = sc.parallelize(generateData, numSlices).cache() - val pointsColumnCached = pointsCached.convert(ColumnFormat, false).cache().cacheGpu() + val skelton = sc.parallelize((1 to N), numSlices) + val pointsCached = skelton.map(i => generateData(i, N, D, R)).cache + pointsCached.count() + + val pointsColumnCached = pointsCached.convert(ColumnFormat, false).cacheGpu() + pointsColumnCached.count() // Initialize w to a random value var wCPU = Array.fill(D){2 * rand.nextDouble - 1} @@ -1031,7 +1036,7 @@ BB_RET: assert(r2.sameElements(r1.map(mulby2))) // UncacheGPU should clear the GPU cache. - baseRDD.unCacheGpu().unCacheGpu() + baseRDD.unCacheGpu() r1 = baseRDD.mapExtFunc((x: Int) => 2 * x, mapFunction).collect() r2 = baseRDD.mapExtFunc((x: Int) => 2 * x, mapFunction).collect() assert(r2.sameElements(r2)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkGPULR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkGPULR.scala index 8829d30b7d..7582ddceab 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkGPULR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkGPULR.scala @@ -102,8 +102,10 @@ object SparkGPULR { Some(dimensions))) val skelton = sc.parallelize((1 to N), numSlices) - val points = skelton.map(i => generateData(i, N, D, R)) - val pointsColumnCached = points.convert(ColumnFormat).cache().cacheGpu() + val points = skelton.map(i => generateData(i, N, D, R)).cache + points.count() + + val pointsColumnCached = points.convert(ColumnFormat).cacheGpu() pointsColumnCached.count() // Initialize w to a random value