Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ class CudaContext::CudaContextImpl {
return Status::OK();
}

Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>& dst_ctx, void* dst,
const void* src, int64_t nbytes) {
ContextSaver set_temporary(context_);
CU_RETURN_NOT_OK(cuMemcpyPeer(reinterpret_cast<CUdeviceptr>(dst),
reinterpret_cast<CUcontext>(dst_ctx->handle()),
reinterpret_cast<const CUdeviceptr>(src), context_,
static_cast<size_t>(nbytes)));
return Status::OK();
}

Status Synchronize(void) {
ContextSaver set_temporary(context_);
CU_RETURN_NOT_OK(cuCtxSynchronize());
Expand Down Expand Up @@ -301,6 +311,12 @@ Status CudaContext::CopyDeviceToDevice(void* dst, const void* src, int64_t nbyte
return impl_->CopyDeviceToDevice(dst, src, nbytes);
}

Status CudaContext::CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>& dst_ctx,
void* dst, const void* src,
int64_t nbytes) {
return impl_->CopyDeviceToAnotherDevice(dst_ctx, dst, src, nbytes);
}

Status CudaContext::Synchronize(void) { return impl_->Synchronize(); }

Status CudaContext::Close() { return impl_->Close(); }
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
Status CopyHostToDevice(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToHost(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToDevice(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToAnotherDevice(const std::shared_ptr<CudaContext>& dst_ctx, void* dst,
const void* src, int64_t nbytes);
Status Free(void* device_ptr, int64_t nbytes);

class CudaContextImpl;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/gpu/cuda_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data,
return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes);
}

Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
const int64_t position, const void* data,
int64_t nbytes) {
DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer";
return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data,
nbytes);
}

Status CudaBuffer::ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle) {
if (is_ipc_) {
return Status::Invalid("Buffer has already been exported for IPC");
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/arrow/gpu/cuda_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
Status CopyToHost(const int64_t position, const int64_t nbytes, void* out) const;

/// \brief Copy memory to device at position
/// \param[in] position start position to copy bytes
/// \param[in] position start position to copy bytes to
/// \param[in] data the host data to copy
/// \param[in] nbytes number of bytes to copy
/// \return Status
Expand All @@ -80,6 +80,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
/// memories have been allocated within the same context.
Status CopyFromDevice(const int64_t position, const void* data, int64_t nbytes);

/// \brief Copy memory from another device to device at position
/// \param[in] src_ctx context of the source device memory
/// \param[in] position start position inside buffer to copy bytes to
/// \param[in] data start address of the another device memory area to copy from
/// \param[in] nbytes number of bytes to copy
/// \return Status
Status CopyFromAnotherDevice(const std::shared_ptr<CudaContext>& src_ctx,
const int64_t position, const void* data, int64_t nbytes);

/// \brief Expose this device buffer as IPC memory which can be used in other processes
/// \param[out] handle the exported IPC handle
/// \return Status
Expand Down
23 changes: 13 additions & 10 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,12 @@ cdef class CudaBuffer(Buffer):
def copy_from_device(self, buf, int64_t position=0, int64_t nbytes=-1):
"""Copy data from device to device.

The destination device buffer must be pre-allocated within the
same context as source device buffer.

Parameters
----------
buf : CudaBuffer
Specify source device buffer.
position : int
Specify the starting position of the copy in devive buffer.
Specify the starting position of the copy in device buffer.
Default: 0.
nbytes : int
Specify the number of bytes to copy. Default: -1 (all from
Expand All @@ -581,9 +578,6 @@ cdef class CudaBuffer(Buffer):
Number of bytes copied.

"""
if self.context.handle != buf.context.handle:
raise ValueError('device source and destination buffers must be '
'within the same context')
if position < 0 or position > self.size:
raise ValueError('position argument is out-of-range')
cdef int64_t nbytes_
Expand All @@ -605,9 +599,18 @@ cdef class CudaBuffer(Buffer):

cdef shared_ptr[CCudaBuffer] buf_ = pyarrow_unwrap_cudabuffer(buf)
cdef int64_t position_ = position
with nogil:
check_status(self.cuda_buffer.get().
CopyFromDevice(position_, buf_.get().data(), nbytes_))
cdef shared_ptr[CCudaContext] src_ctx_ = pyarrow_unwrap_cudacontext(
buf.context)
if self.context.handle != buf.context.handle:
with nogil:
check_status(self.cuda_buffer.get().
CopyFromAnotherDevice(src_ctx_, position_,
buf_.get().data(), nbytes_))
else:
with nogil:
check_status(self.cuda_buffer.get().
CopyFromDevice(position_, buf_.get().data(),
nbytes_))
return nbytes_

def export_for_ipc(self):
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow_cuda.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil:
int64_t nbytes)
CStatus CopyFromDevice(const int64_t position, const void* data,
int64_t nbytes)
CStatus CopyFromAnotherDevice(const shared_ptr[CCudaContext]& src_ctx,
const int64_t position, const void* data,
int64_t nbytes)
CStatus ExportForIpc(shared_ptr[CCudaIpcMemHandle]* handle)
shared_ptr[CCudaContext] context() const

Expand Down
14 changes: 12 additions & 2 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
reason='CUDA IPC not supported in platform `%s`' % (platform))

global_context = None # for flake8
global_context1 = None # for flake8


def setup_module(module):
module.global_context = cuda.Context(0)
module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1)


def teardown_module(module):
Expand All @@ -53,6 +55,7 @@ def teardown_module(module):
def test_Context():
assert cuda.Context.get_num_devices() > 0
assert global_context.device_number == 0
assert global_context1.device_number == cuda.Context.get_num_devices() - 1

with pytest.raises(ValueError,
match=("device_number argument must "
Expand Down Expand Up @@ -398,11 +401,18 @@ def test_copy_to_host(size):
dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)


@pytest.mark.parametrize("dest_ctx", ['same', 'another'])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_device(size):
def test_copy_from_device(dest_ctx, size):
arr, buf = make_random_buffer(size=size, target='device')
lst = arr.tolist()
dbuf = buf.context.new_buffer(size)
if dest_ctx == 'another':
dest_ctx = global_context1
if buf.context.device_number == dest_ctx.device_number:
pytest.skip("not a multi-GPU system")
else:
dest_ctx = buf.context
dbuf = dest_ctx.new_buffer(size)

def put(*args, **kwargs):
dbuf.copy_from_device(buf, *args, **kwargs)
Expand Down