Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 102 additions & 163 deletions numba_cuda/numba/cuda/tests/cudapy/test_ipc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD-2-Clause

import pytest
import concurrent.futures
import multiprocessing as mp
import os
import itertools
import traceback
import pickle

import numpy as np
Expand All @@ -21,80 +23,63 @@
import unittest


def core_ipc_handle_test(the_work, result_queue):
try:
arr = the_work()
# Catch anything going wrong in the worker function
except: # noqa: E722
# FAILED. propagate the exception as a string
succ = False
out = traceback.format_exc()
else:
# OK. send the ndarray back
succ = True
out = arr
result_queue.put((succ, out))


def base_ipc_handle_test(handle, size, result_queue):
def the_work():
dtype = np.dtype(np.intp)
with cuda.open_ipc_array(
handle, shape=size // dtype.itemsize, dtype=dtype
) as darr:
# copy the data to host
return darr.copy_to_host()

core_ipc_handle_test(the_work, result_queue)


def serialize_ipc_handle_test(handle, result_queue):
def the_work():
dtype = np.dtype(np.intp)
darr = handle.open_array(
cuda.current_context(),
shape=handle.size // dtype.itemsize,
dtype=dtype,
)
def base_ipc_handle_test(handle, size, parent_pid):
pid = os.getpid()
assert pid != parent_pid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the parent_pid here isn't strictly necessary and I can remove it if desired, I was just verifying my understanding of what was happening with ProcessPoolExecutor.

dtype = np.dtype(np.intp)
with cuda.open_ipc_array(
handle, shape=size // dtype.itemsize, dtype=dtype
) as darr:
# copy the data to host
return darr.copy_to_host()


def serialize_ipc_handle_test(handle, parent_pid):
pid = os.getpid()
assert pid != parent_pid

dtype = np.dtype(np.intp)
darr = handle.open_array(
cuda.current_context(),
shape=handle.size // dtype.itemsize,
dtype=dtype,
)
# copy the data to host
arr = darr.copy_to_host()
handle.close()
return arr


def ipc_array_test(ipcarr, parent_pid):
pid = os.getpid()
assert pid != parent_pid
with ipcarr as darr:
arr = darr.copy_to_host()
handle.close()
return arr
with pytest.raises(ValueError, match="IpcHandle is already opened"):
with ipcarr:
pass
return arr

core_ipc_handle_test(the_work, result_queue)

class CUDAIpcTestCase(CUDATestCase):
@classmethod
def setUpClass(cls) -> None:
cls.exe = concurrent.futures.ProcessPoolExecutor(
mp_context=mp.get_context("spawn")
)

def ipc_array_test(ipcarr, result_queue):
try:
with ipcarr as darr:
arr = darr.copy_to_host()
try:
# should fail to reopen
with ipcarr:
pass
except ValueError as e:
if str(e) != "IpcHandle is already opened":
raise AssertionError("invalid exception message")
else:
raise AssertionError("did not raise on reopen")
# Catch any exception so we can propagate it
except: # noqa: E722
# FAILED. propagate the exception as a string
succ = False
out = traceback.format_exc()
else:
# OK. send the ndarray back
succ = True
out = arr
result_queue.put((succ, out))
@classmethod
def tearDownClass(cls) -> None:
cls.exe.shutdown()
del cls.exe


@linux_only
@skip_under_cuda_memcheck("Hangs cuda-memcheck")
@skip_on_cudasim("Ipc not available in CUDASIM")
@skip_on_arm("CUDA IPC not supported on ARM in Numba")
@skip_on_wsl2("CUDA IPC unreliable on WSL2; skipping IPC tests")
class TestIpcMemory(CUDATestCase):
class TestIpcMemory(CUDAIpcTestCase):
def test_ipc_handle(self):
# prepare data for IPC
arr = np.arange(10, dtype=np.intp)
Expand All @@ -109,17 +94,11 @@ def test_ipc_handle(self):
size = ipch.size

# spawn new process for testing
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
args = (handle_bytes, size, result_queue)
proc = ctx.Process(target=base_ipc_handle_test, args=args)
proc.start()
succ, out = result_queue.get()
if not succ:
self.fail(out)
else:
np.testing.assert_equal(arr, out)
proc.join(3)
fut = self.exe.submit(
base_ipc_handle_test, handle_bytes, size, parent_pid=os.getpid()
)
out = fut.result(timeout=3)
np.testing.assert_equal(arr, out)

def variants(self):
# Test with no slicing and various different slices
Expand Down Expand Up @@ -152,17 +131,11 @@ def check_ipc_handle_serialization(self, index_arg=None, foreign=False):
self.assertEqual(ipch_recon.handle.reserved, ipch.handle.reserved)

# spawn new process for testing
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
args = (ipch, result_queue)
proc = ctx.Process(target=serialize_ipc_handle_test, args=args)
proc.start()
succ, out = result_queue.get()
if not succ:
self.fail(out)
else:
np.testing.assert_equal(expect, out)
proc.join(3)
fut = self.exe.submit(
serialize_ipc_handle_test, ipch, parent_pid=os.getpid()
)
out = fut.result(timeout=3)
np.testing.assert_equal(expect, out)

def test_ipc_handle_serialization(self):
for (
Expand All @@ -185,17 +158,9 @@ def check_ipc_array(self, index_arg=None, foreign=False):
ipch = devarr.get_ipc_handle()

# spawn new process for testing
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
args = (ipch, result_queue)
proc = ctx.Process(target=ipc_array_test, args=args)
proc.start()
succ, out = result_queue.get()
if not succ:
self.fail(out)
else:
np.testing.assert_equal(expect, out)
proc.join(3)
fut = self.exe.submit(ipc_array_test, ipch, parent_pid=os.getpid())
out = fut.result(timeout=3)
np.testing.assert_equal(expect, out)

def test_ipc_array(self):
for (
Expand All @@ -206,65 +171,45 @@ def test_ipc_array(self):
self.check_ipc_array(index, foreign)


def staged_ipc_handle_test(handle, device_num, result_queue):
def the_work():
with cuda.gpus[device_num]:
this_ctx = cuda.devices.get_context()
deviceptr = handle.open_staged(this_ctx)
arrsize = handle.size // np.dtype(np.intp).itemsize
hostarray = np.zeros(arrsize, dtype=np.intp)
cuda.driver.device_to_host(
hostarray,
deviceptr,
size=handle.size,
)
handle.close()
def staged_ipc_handle_test(handle, device_num, parent_pid):
pid = os.getpid()
assert pid != parent_pid
with cuda.gpus[device_num]:
this_ctx = cuda.devices.get_context()
deviceptr = handle.open_staged(this_ctx)
arrsize = handle.size // np.dtype(np.intp).itemsize
hostarray = np.zeros(arrsize, dtype=np.intp)
cuda.driver.device_to_host(
hostarray,
deviceptr,
size=handle.size,
)
handle.close()
return hostarray

core_ipc_handle_test(the_work, result_queue)


def staged_ipc_array_test(ipcarr, device_num, result_queue):
try:
with cuda.gpus[device_num]:
with ipcarr as darr:
arr = darr.copy_to_host()
try:
# should fail to reopen
with ipcarr:
pass
except ValueError as e:
if str(e) != "IpcHandle is already opened":
raise AssertionError("invalid exception message")
else:
raise AssertionError("did not raise on reopen")
# Catch any exception so we can propagate it
except: # noqa: E722
# FAILED. propagate the exception as a string
succ = False
out = traceback.format_exc()
else:
# OK. send the ndarray back
succ = True
out = arr
result_queue.put((succ, out))

def staged_ipc_array_test(ipcarr, device_num, parent_pid):
pid = os.getpid()
assert pid != parent_pid
with cuda.gpus[device_num]:
with ipcarr as darr:
arr = darr.copy_to_host()
with pytest.raises(ValueError, match="IpcHandle is already opened"):
with ipcarr:
pass
return arr


@linux_only
@skip_under_cuda_memcheck("Hangs cuda-memcheck")
@skip_on_cudasim("Ipc not available in CUDASIM")
@skip_on_arm("CUDA IPC not supported on ARM in Numba")
@skip_on_wsl2("CUDA IPC unreliable on WSL2; skipping IPC tests")
class TestIpcStaged(CUDATestCase):
class TestIpcStaged(CUDAIpcTestCase):
def test_staged(self):
# prepare data for IPC
arr = np.arange(10, dtype=np.intp)
devarr = cuda.to_device(arr)

# spawn new process for testing
mpctx = mp.get_context("spawn")
result_queue = mpctx.Queue()

# create IPC handle
ctx = cuda.current_context()
ipch = ctx.get_ipc_handle(devarr.gpu_data)
Expand All @@ -276,16 +221,16 @@ def test_staged(self):
self.assertEqual(ipch_recon.size, ipch.size)

# Test on every CUDA devices
for device_num in range(len(cuda.gpus)):
args = (ipch, device_num, result_queue)
proc = mpctx.Process(target=staged_ipc_handle_test, args=args)
proc.start()
succ, out = result_queue.get()
proc.join(3)
if not succ:
self.fail(out)
else:
np.testing.assert_equal(arr, out)
ngpus = len(cuda.gpus)
futures = [
self.exe.submit(
staged_ipc_handle_test, ipch, device_num, parent_pid=os.getpid()
)
for device_num in range(ngpus)
]

for fut in concurrent.futures.as_completed(futures, timeout=3 * ngpus):
np.testing.assert_equal(arr, fut.result())

def test_ipc_array(self):
for device_num in range(len(cuda.gpus)):
Expand All @@ -295,17 +240,11 @@ def test_ipc_array(self):
ipch = devarr.get_ipc_handle()

# spawn new process for testing
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
args = (ipch, device_num, result_queue)
proc = ctx.Process(target=staged_ipc_array_test, args=args)
proc.start()
succ, out = result_queue.get()
proc.join(3)
if not succ:
self.fail(out)
else:
np.testing.assert_equal(arr, out)
fut = self.exe.submit(
staged_ipc_array_test, ipch, device_num, parent_pid=os.getpid()
)
out = fut.result(timeout=3)
np.testing.assert_equal(arr, out)


@windows_only
Expand Down