diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 2f3f1bd3f10..93caa524167 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -112,6 +112,16 @@ class CudaContext::CudaContextImpl { return Status::OK(); } + Status CopyDeviceToAnotherDevice(const std::shared_ptr& dst_ctx, void* dst, + const void* src, int64_t nbytes) { + ContextSaver set_temporary(context_); + CU_RETURN_NOT_OK(cuMemcpyPeer(reinterpret_cast(dst), + reinterpret_cast(dst_ctx->handle()), + reinterpret_cast(src), context_, + static_cast(nbytes))); + return Status::OK(); + } + Status Synchronize(void) { ContextSaver set_temporary(context_); CU_RETURN_NOT_OK(cuCtxSynchronize()); @@ -301,6 +311,12 @@ Status CudaContext::CopyDeviceToDevice(void* dst, const void* src, int64_t nbyte return impl_->CopyDeviceToDevice(dst, src, nbytes); } +Status CudaContext::CopyDeviceToAnotherDevice(const std::shared_ptr& dst_ctx, + void* dst, const void* src, + int64_t nbytes) { + return impl_->CopyDeviceToAnotherDevice(dst_ctx, dst, src, nbytes); +} + Status CudaContext::Synchronize(void) { return impl_->Synchronize(); } Status CudaContext::Close() { return impl_->Close(); } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 938a81561d0..682cbd87fe3 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -141,6 +141,8 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this& dst_ctx, void* dst, + const void* src, int64_t nbytes); Status Free(void* device_ptr, int64_t nbytes); class CudaContextImpl; diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index a0da580acf9..e91fef2eb40 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -163,6 +163,14 @@ Status CudaBuffer::CopyFromDevice(const int64_t position, const void* data, return context_->CopyDeviceToDevice(mutable_data_ + position, data, nbytes); } +Status CudaBuffer::CopyFromAnotherDevice(const std::shared_ptr& src_ctx, + const int64_t position, const void* data, + int64_t nbytes) { + DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer"; + return src_ctx->CopyDeviceToAnotherDevice(context_, mutable_data_ + position, data, + nbytes); +} + Status CudaBuffer::ExportForIpc(std::shared_ptr* handle) { if (is_ipc_) { return Status::Invalid("Buffer has already been exported for IPC"); diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 193deed82e5..6b9f04cc6de 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -64,7 +64,7 @@ class ARROW_EXPORT CudaBuffer : public Buffer { Status CopyToHost(const int64_t position, const int64_t nbytes, void* out) const; /// \brief Copy memory to device at position - /// \param[in] position start position to copy bytes + /// \param[in] position start position to copy bytes to /// \param[in] data the host data to copy /// \param[in] nbytes number of bytes to copy /// \return Status @@ -80,6 +80,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// memories have been allocated within the same context. Status CopyFromDevice(const int64_t position, const void* data, int64_t nbytes); + /// \brief Copy memory from another device to device at position + /// \param[in] src_ctx context of the source device memory + /// \param[in] position start position inside buffer to copy bytes to + /// \param[in] data start address of the another device memory area to copy from + /// \param[in] nbytes number of bytes to copy + /// \return Status + Status CopyFromAnotherDevice(const std::shared_ptr& src_ctx, + const int64_t position, const void* data, int64_t nbytes); + /// \brief Expose this device buffer as IPC memory which can be used in other processes /// \param[out] handle the exported IPC handle /// \return Status diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx index eac3daef4f5..fa84fc60ebf 100644 --- a/python/pyarrow/_cuda.pyx +++ b/python/pyarrow/_cuda.pyx @@ -561,15 +561,12 @@ cdef class CudaBuffer(Buffer): def copy_from_device(self, buf, int64_t position=0, int64_t nbytes=-1): """Copy data from device to device. - The destination device buffer must be pre-allocated within the - same context as source device buffer. - Parameters ---------- buf : CudaBuffer Specify source device buffer. position : int - Specify the starting position of the copy in devive buffer. + Specify the starting position of the copy in device buffer. Default: 0. nbytes : int Specify the number of bytes to copy. Default: -1 (all from @@ -581,9 +578,6 @@ cdef class CudaBuffer(Buffer): Number of bytes copied. """ - if self.context.handle != buf.context.handle: - raise ValueError('device source and destination buffers must be ' - 'within the same context') if position < 0 or position > self.size: raise ValueError('position argument is out-of-range') cdef int64_t nbytes_ @@ -605,9 +599,18 @@ cdef class CudaBuffer(Buffer): cdef shared_ptr[CCudaBuffer] buf_ = pyarrow_unwrap_cudabuffer(buf) cdef int64_t position_ = position - with nogil: - check_status(self.cuda_buffer.get(). - CopyFromDevice(position_, buf_.get().data(), nbytes_)) + cdef shared_ptr[CCudaContext] src_ctx_ = pyarrow_unwrap_cudacontext( + buf.context) + if self.context.handle != buf.context.handle: + with nogil: + check_status(self.cuda_buffer.get(). + CopyFromAnotherDevice(src_ctx_, position_, + buf_.get().data(), nbytes_)) + else: + with nogil: + check_status(self.cuda_buffer.get(). + CopyFromDevice(position_, buf_.get().data(), + nbytes_)) return nbytes_ def export_for_ipc(self): diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index ef89d9cdd2e..ce091151518 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -71,6 +71,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil: int64_t nbytes) CStatus CopyFromDevice(const int64_t position, const void* data, int64_t nbytes) + CStatus CopyFromAnotherDevice(const shared_ptr[CCudaContext]& src_ctx, + const int64_t position, const void* data, + int64_t nbytes) CStatus ExportForIpc(shared_ptr[CCudaIpcMemHandle]* handle) shared_ptr[CCudaContext] context() const diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 4633df1f248..7c56e33274e 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -40,10 +40,12 @@ reason='CUDA IPC not supported in platform `%s`' % (platform)) global_context = None # for flake8 +global_context1 = None # for flake8 def setup_module(module): module.global_context = cuda.Context(0) + module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1) def teardown_module(module): @@ -53,6 +55,7 @@ def teardown_module(module): def test_Context(): assert cuda.Context.get_num_devices() > 0 assert global_context.device_number == 0 + assert global_context1.device_number == cuda.Context.get_num_devices() - 1 with pytest.raises(ValueError, match=("device_number argument must " @@ -398,11 +401,18 @@ def test_copy_to_host(size): dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes) +@pytest.mark.parametrize("dest_ctx", ['same', 'another']) @pytest.mark.parametrize("size", [0, 1, 1000]) -def test_copy_from_device(size): +def test_copy_from_device(dest_ctx, size): arr, buf = make_random_buffer(size=size, target='device') lst = arr.tolist() - dbuf = buf.context.new_buffer(size) + if dest_ctx == 'another': + dest_ctx = global_context1 + if buf.context.device_number == dest_ctx.device_number: + pytest.skip("not a multi-GPU system") + else: + dest_ctx = buf.context + dbuf = dest_ctx.new_buffer(size) def put(*args, **kwargs): dbuf.copy_from_device(buf, *args, **kwargs)