Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions core/src/main/scala/org/apache/spark/ColumnPartitionData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark

import jcuda.driver.CUmodule
import jcuda.runtime.{cudaStream_t, cudaMemcpyKind, JCuda}
import org.apache.spark.storage.{RDDBlockId, BlockId}

import math._
Expand All @@ -31,11 +29,9 @@ import java.nio.{ByteBuffer, ByteOrder}
import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.IteratorFunctions._
import org.apache.spark.util.Utils

import jcuda.Pointer
import org.apache.spark.unsafe.memory.Pointer

import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -61,6 +57,7 @@ class ColumnPartitionData[T](
private var refCounter = 1

// TODO blockId can never be NULL, modify the testcase to pass valid blockId and remove (0,0).
val cudaManager = SparkEnv.get.cudaManager
var blockId : Option[BlockId] = Some(RDDBlockId(0, 0))
def rddId : Int = blockId.getOrElse(RDDBlockId(0, 0)).asRDDId.get.rddId
def cachedGPUPointers : HashMap[String, Pointer] =
Expand Down Expand Up @@ -194,7 +191,7 @@ class ColumnPartitionData[T](
order.map(columnsByAccessors(_))
}

private[spark] def orderedGPUPointers(order: Seq[String], stream : cudaStream_t):
private[spark] def orderedGPUPointers(order: Seq[String], devIx: Int):
Vector[Pointer] = {
var gpuPtrs = Vector[Pointer]()
var gpuBlobs = Vector[Pointer]()
Expand All @@ -204,7 +201,7 @@ class ColumnPartitionData[T](
val inPointers = orderedPointers(order)
for ((col, name, cpuPtr) <- (inColumns, order, inPointers).zipped) {
gpuPtrs = gpuPtrs :+ cachedGPUPointers.getOrElseUpdate(blockId.get + name, {
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(col.memoryUsage(size))
val gpuPtr = cudaManager.allocGPUMemory(col.memoryUsage(size))
memCpys = memCpys :+ (gpuPtr, cpuPtr, col.memoryUsage(size))
gpuPtr
})
Expand All @@ -215,7 +212,7 @@ class ColumnPartitionData[T](
for ((blob, name, cpuPtr) <-
(inBlobBuffers, (1 to inBlobBuffers.length).map(_.toString), inBlobs).zipped) {
gpuBlobs = gpuBlobs :+ cachedGPUPointers.getOrElseUpdate(blockId.get + name, {
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(blob.capacity())
val gpuPtr = cudaManager.allocGPUMemory(blob.capacity())
memCpys = memCpys :+ (gpuPtr, cpuPtr, blob.capacity().toLong)
gpuPtr
})
Expand All @@ -233,8 +230,7 @@ class ColumnPartitionData[T](
*/

for ((gpuPtr, cpuPtr, length) <- memCpys) {
JCuda.cudaMemcpyAsync(gpuPtr, cpuPtr, length,
cudaMemcpyKind.cudaMemcpyHostToDevice, stream)
cudaManager.memcpyH2DASync(gpuPtr, cpuPtr, length, devIx)
}

gpuPtrs ++ gpuBlobs
Expand All @@ -244,7 +240,7 @@ class ColumnPartitionData[T](
if (!gpuCache) {
for ((name, ptr) <- cachedGPUPointers) {
if (name.startsWith(blockId.get.toString)) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
cachedGPUPointers.remove(name)
}
}
Expand Down
80 changes: 31 additions & 49 deletions core/src/main/scala/org/apache/spark/cuda/CUDAFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/


package org.apache.spark.cuda

import org.apache.spark.storage.BlockId
Expand All @@ -25,20 +26,14 @@ import java.net.URL
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}

import jcuda.Pointer
import jcuda.driver.CUfunction
import jcuda.driver.CUmodule
import jcuda.driver.CUstream
import jcuda.driver.JCudaDriver
import jcuda.runtime.cudaStream_t
import jcuda.runtime.cudaMemcpyKind
import jcuda.runtime.JCuda

import org.apache.commons.io.IOUtils
import org.apache.spark.rdd.ExternalFunction
import org.apache.spark.{PartitionData, ColumnPartitionData, ColumnPartitionSchema, SparkEnv,
SparkException}
import org.apache.spark.util.Utils
import org.apache.spark.unsafe.memory.Pointer

/**
* A CUDA kernel wrapper. Contains CUDA module, information how to extract CUDA kernel from it and
Expand Down Expand Up @@ -86,28 +81,20 @@ class CUDAFunction(
outputArraySizes: Seq[Long] = null,
inputFreeVariables: Seq[Any] = null,
blockId : Option[BlockId] = None): ColumnPartitionData[U] = {
val cudaManager = SparkEnv.get.cudaManager
val outputSchema = ColumnPartitionSchema.schemaFor[U]

// TODO add array size
val memoryUsage = (if (in.gpuCached) 0 else in.memoryUsage) + outputSchema.memoryUsage(in.size)

val streamDevIx = SparkEnv.get.cudaManager.getStream(memoryUsage, in.gpuDevIx)
val stream = streamDevIx._1
val devIx = cudaManager.getDevice(memoryUsage, in.gpuDevIx)
if (in.gpuCache) {
in.gpuDevIx = streamDevIx._2
in.gpuDevIx = devIx
}

// TODO cache the function if there is a chance that after a deserialization kernel gets called
// multiple times - but only if no synchronization is needed for that
val module = resource match {
case url: URL =>
SparkEnv.get.cudaManager.cachedLoadModule(Left(url))
case (name: String, ptx: String) =>
SparkEnv.get.cudaManager.cachedLoadModule(Right(name, ptx))
case _ => throw new SparkException("Unsupported resource type for CUDAFunction")
}
val function = new CUfunction
JCudaDriver.cuModuleGetFunction(function, module, kernelSignature)
val function = cudaManager.moduleGetFunction(resource, kernelSignature)

val actualOutputSize = outputSize.getOrElse(in.size)
val out = if (outputArraySizes == null) {
Expand All @@ -127,8 +114,8 @@ class CUDAFunction(
val outColumns = out.schema.orderedColumns(outputColumnsOrder)
for (col <- outColumns) {
val size = col.memoryUsage(out.size)
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(size)
JCuda.cudaMemsetAsync(gpuPtr, 0, size, stream)
val gpuPtr = cudaManager.allocGPUMemory(size)
cudaManager.memsetASync(gpuPtr, 0, size, devIx)
gpuOutputPtrs = gpuOutputPtrs :+ gpuPtr
}

Expand All @@ -144,31 +131,31 @@ class CUDAFunction(
case v: Array[Byte] =>
val len = v.length
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Char] =>
val len = v.length * 2
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Short] =>
val len = v.length * 2
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Int] =>
val len = v.length * 4
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Long] =>
val len = v.length * 8
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Float] =>
val len = v.length * 4
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case v: Array[Double] =>
val len = v.length * 8
cpuInputFreeVars = cpuInputFreeVars :+ (Pointer.to(v), len)
SparkEnv.get.cudaManager.allocGPUMemory(len)
cudaManager.allocGPUMemory(len)
case _ => throw new SparkException("Unsupported type passed to kernel "
+ "as a free variable argument")
}
Expand All @@ -180,17 +167,16 @@ class CUDAFunction(
if (out.blobBuffers != null) {out.blobBuffers} else {Array[ByteBuffer]()}
for (blob <- outBlobBuffers) {
val size = blob.capacity()
val gpuPtr = SparkEnv.get.cudaManager.allocGPUMemory(size)
JCuda.cudaMemsetAsync(gpuPtr, 0, size, stream)
val gpuPtr = cudaManager.allocGPUMemory(size)
cudaManager.memsetASync(gpuPtr, 0, size, devIx)
gpuOutputBlobs = gpuOutputBlobs :+ gpuPtr
}

// perform allocGPUMemory and cudaMemcpyAsync
val gpuInputPtrs = in.orderedGPUPointers(inputColumnsOrder, stream)
val gpuInputPtrs = in.orderedGPUPointers(inputColumnsOrder, devIx)

for (((cpuPtr, size), gpuPtr) <- (cpuInputFreeVars zip inputFreeVarPtrs)) {
JCuda.cudaMemcpyAsync(gpuPtr, cpuPtr, size,
cudaMemcpyKind.cudaMemcpyHostToDevice, stream)
cudaManager.memcpyH2DASync(gpuPtr, cpuPtr, size, devIx)
}

val gpuPtrParams = (gpuInputPtrs ++
Expand All @@ -208,8 +194,6 @@ class CUDAFunction(
+ "argument")
}

val wrappedStream = new CUstream(stream)

stagesCount match {
// normal launch, no stages, suitable for map
case None =>
Expand All @@ -220,16 +204,16 @@ class CUDAFunction(

val (gpuGridSize, gpuBlockSize) = dimensions match {
case Some(computeDim) => computeDim(in.size, 1)
case None => SparkEnv.get.cudaManager.computeDimensions(in.size)
case None => cudaManager.computeDimensions(in.size)
}

JCudaDriver.cuLaunchKernel(
cudaManager.launchKernel(
function,
gpuGridSize, 1, 1,
gpuBlockSize, 1, 1,
0,
wrappedStream,
kernelParameters, null)
devIx,
kernelParameters)

// launch kernel multiple times (multiple stages), suitable for reduce
case Some(totalStagesFun) =>
Expand All @@ -253,39 +237,37 @@ class CUDAFunction(
throw new SparkException("Dimensions must be provided for multi-stage kernels")
}

JCudaDriver.cuLaunchKernel(
cudaManager.launchKernel(
function,
gpuGridSize, 1, 1,
gpuBlockSize, 1, 1,
0,
wrappedStream,
kernelParameters, null)
devIx,
kernelParameters)
}
}

val outPointers = out.orderedPointers(outputColumnsOrder)
for ((cpuPtr, gpuPtr, col) <- (outPointers, gpuOutputPtrs, outColumns).zipped) {
JCuda.cudaMemcpyAsync(cpuPtr, gpuPtr, col.memoryUsage(out.size),
cudaMemcpyKind.cudaMemcpyDeviceToHost, stream)
cudaManager.memcpyD2HASync(cpuPtr, gpuPtr, col.memoryUsage(out.size), devIx)
}

for ((cpuPtr, gpuPtr, blob) <- (outBlobs, gpuOutputBlobs, outBlobBuffers).zipped) {
JCuda.cudaMemcpyAsync(cpuPtr, gpuPtr, blob.capacity(),
cudaMemcpyKind.cudaMemcpyDeviceToHost, stream)
cudaManager.memcpyD2HASync(cpuPtr, gpuPtr, blob.capacity(), devIx)
}

if (!in.gpuCache || ((gpuOutputPtrs.size + gpuOutputBlobs.size) > 0)) {
JCuda.cudaStreamSynchronize(stream)
cudaManager.streamSynchronize(devIx)
}
out.blockId = blockId
out
} {
in.freeGPUPointers()
for (ptr <- gpuOutputPtrs) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
}
for (ptr <- gpuOutputBlobs) {
SparkEnv.get.cudaManager.freeGPUMemory(ptr)
cudaManager.freeGPUMemory(ptr)
}
}
} catch {
Expand Down
Loading