diff --git a/numba_cuda/numba/cuda/tests/cudapy/test_ipc.py b/numba_cuda/numba/cuda/tests/cudapy/test_ipc.py index 0c01cdcd9..21256fe55 100644 --- a/numba_cuda/numba/cuda/tests/cudapy/test_ipc.py +++ b/numba_cuda/numba/cuda/tests/cudapy/test_ipc.py @@ -1,9 +1,11 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: BSD-2-Clause +import pytest +import concurrent.futures import multiprocessing as mp +import os import itertools -import traceback import pickle import numpy as np @@ -21,72 +23,55 @@ import unittest -def core_ipc_handle_test(the_work, result_queue): - try: - arr = the_work() - # Catch anything going wrong in the worker function - except: # noqa: E722 - # FAILED. propagate the exception as a string - succ = False - out = traceback.format_exc() - else: - # OK. send the ndarray back - succ = True - out = arr - result_queue.put((succ, out)) - - -def base_ipc_handle_test(handle, size, result_queue): - def the_work(): - dtype = np.dtype(np.intp) - with cuda.open_ipc_array( - handle, shape=size // dtype.itemsize, dtype=dtype - ) as darr: - # copy the data to host - return darr.copy_to_host() - - core_ipc_handle_test(the_work, result_queue) - - -def serialize_ipc_handle_test(handle, result_queue): - def the_work(): - dtype = np.dtype(np.intp) - darr = handle.open_array( - cuda.current_context(), - shape=handle.size // dtype.itemsize, - dtype=dtype, - ) +def base_ipc_handle_test(handle, size, parent_pid): + pid = os.getpid() + assert pid != parent_pid + dtype = np.dtype(np.intp) + with cuda.open_ipc_array( + handle, shape=size // dtype.itemsize, dtype=dtype + ) as darr: # copy the data to host + return darr.copy_to_host() + + +def serialize_ipc_handle_test(handle, parent_pid): + pid = os.getpid() + assert pid != parent_pid + + dtype = np.dtype(np.intp) + darr = handle.open_array( + cuda.current_context(), + shape=handle.size // dtype.itemsize, + dtype=dtype, + ) + # copy the data to host + arr = darr.copy_to_host() + handle.close() + return arr + + +def ipc_array_test(ipcarr, parent_pid): + pid = os.getpid() + assert pid != parent_pid + with ipcarr as darr: arr = darr.copy_to_host() - handle.close() - return arr + with pytest.raises(ValueError, match="IpcHandle is already opened"): + with ipcarr: + pass + return arr - core_ipc_handle_test(the_work, result_queue) +class CUDAIpcTestCase(CUDATestCase): + @classmethod + def setUpClass(cls) -> None: + cls.exe = concurrent.futures.ProcessPoolExecutor( + mp_context=mp.get_context("spawn") + ) -def ipc_array_test(ipcarr, result_queue): - try: - with ipcarr as darr: - arr = darr.copy_to_host() - try: - # should fail to reopen - with ipcarr: - pass - except ValueError as e: - if str(e) != "IpcHandle is already opened": - raise AssertionError("invalid exception message") - else: - raise AssertionError("did not raise on reopen") - # Catch any exception so we can propagate it - except: # noqa: E722 - # FAILED. propagate the exception as a string - succ = False - out = traceback.format_exc() - else: - # OK. send the ndarray back - succ = True - out = arr - result_queue.put((succ, out)) + @classmethod + def tearDownClass(cls) -> None: + cls.exe.shutdown() + del cls.exe @linux_only @@ -94,7 +79,7 @@ def ipc_array_test(ipcarr, result_queue): @skip_on_cudasim("Ipc not available in CUDASIM") @skip_on_arm("CUDA IPC not supported on ARM in Numba") @skip_on_wsl2("CUDA IPC unreliable on WSL2; skipping IPC tests") -class TestIpcMemory(CUDATestCase): +class TestIpcMemory(CUDAIpcTestCase): def test_ipc_handle(self): # prepare data for IPC arr = np.arange(10, dtype=np.intp) @@ -109,17 +94,11 @@ def test_ipc_handle(self): size = ipch.size # spawn new process for testing - ctx = mp.get_context("spawn") - result_queue = ctx.Queue() - args = (handle_bytes, size, result_queue) - proc = ctx.Process(target=base_ipc_handle_test, args=args) - proc.start() - succ, out = result_queue.get() - if not succ: - self.fail(out) - else: - np.testing.assert_equal(arr, out) - proc.join(3) + fut = self.exe.submit( + base_ipc_handle_test, handle_bytes, size, parent_pid=os.getpid() + ) + out = fut.result(timeout=3) + np.testing.assert_equal(arr, out) def variants(self): # Test with no slicing and various different slices @@ -152,17 +131,11 @@ def check_ipc_handle_serialization(self, index_arg=None, foreign=False): self.assertEqual(ipch_recon.handle.reserved, ipch.handle.reserved) # spawn new process for testing - ctx = mp.get_context("spawn") - result_queue = ctx.Queue() - args = (ipch, result_queue) - proc = ctx.Process(target=serialize_ipc_handle_test, args=args) - proc.start() - succ, out = result_queue.get() - if not succ: - self.fail(out) - else: - np.testing.assert_equal(expect, out) - proc.join(3) + fut = self.exe.submit( + serialize_ipc_handle_test, ipch, parent_pid=os.getpid() + ) + out = fut.result(timeout=3) + np.testing.assert_equal(expect, out) def test_ipc_handle_serialization(self): for ( @@ -185,17 +158,9 @@ def check_ipc_array(self, index_arg=None, foreign=False): ipch = devarr.get_ipc_handle() # spawn new process for testing - ctx = mp.get_context("spawn") - result_queue = ctx.Queue() - args = (ipch, result_queue) - proc = ctx.Process(target=ipc_array_test, args=args) - proc.start() - succ, out = result_queue.get() - if not succ: - self.fail(out) - else: - np.testing.assert_equal(expect, out) - proc.join(3) + fut = self.exe.submit(ipc_array_test, ipch, parent_pid=os.getpid()) + out = fut.result(timeout=3) + np.testing.assert_equal(expect, out) def test_ipc_array(self): for ( @@ -206,48 +171,33 @@ def test_ipc_array(self): self.check_ipc_array(index, foreign) -def staged_ipc_handle_test(handle, device_num, result_queue): - def the_work(): - with cuda.gpus[device_num]: - this_ctx = cuda.devices.get_context() - deviceptr = handle.open_staged(this_ctx) - arrsize = handle.size // np.dtype(np.intp).itemsize - hostarray = np.zeros(arrsize, dtype=np.intp) - cuda.driver.device_to_host( - hostarray, - deviceptr, - size=handle.size, - ) - handle.close() +def staged_ipc_handle_test(handle, device_num, parent_pid): + pid = os.getpid() + assert pid != parent_pid + with cuda.gpus[device_num]: + this_ctx = cuda.devices.get_context() + deviceptr = handle.open_staged(this_ctx) + arrsize = handle.size // np.dtype(np.intp).itemsize + hostarray = np.zeros(arrsize, dtype=np.intp) + cuda.driver.device_to_host( + hostarray, + deviceptr, + size=handle.size, + ) + handle.close() return hostarray - core_ipc_handle_test(the_work, result_queue) - - -def staged_ipc_array_test(ipcarr, device_num, result_queue): - try: - with cuda.gpus[device_num]: - with ipcarr as darr: - arr = darr.copy_to_host() - try: - # should fail to reopen - with ipcarr: - pass - except ValueError as e: - if str(e) != "IpcHandle is already opened": - raise AssertionError("invalid exception message") - else: - raise AssertionError("did not raise on reopen") - # Catch any exception so we can propagate it - except: # noqa: E722 - # FAILED. propagate the exception as a string - succ = False - out = traceback.format_exc() - else: - # OK. send the ndarray back - succ = True - out = arr - result_queue.put((succ, out)) + +def staged_ipc_array_test(ipcarr, device_num, parent_pid): + pid = os.getpid() + assert pid != parent_pid + with cuda.gpus[device_num]: + with ipcarr as darr: + arr = darr.copy_to_host() + with pytest.raises(ValueError, match="IpcHandle is already opened"): + with ipcarr: + pass + return arr @linux_only @@ -255,16 +205,11 @@ def staged_ipc_array_test(ipcarr, device_num, result_queue): @skip_on_cudasim("Ipc not available in CUDASIM") @skip_on_arm("CUDA IPC not supported on ARM in Numba") @skip_on_wsl2("CUDA IPC unreliable on WSL2; skipping IPC tests") -class TestIpcStaged(CUDATestCase): +class TestIpcStaged(CUDAIpcTestCase): def test_staged(self): # prepare data for IPC arr = np.arange(10, dtype=np.intp) devarr = cuda.to_device(arr) - - # spawn new process for testing - mpctx = mp.get_context("spawn") - result_queue = mpctx.Queue() - # create IPC handle ctx = cuda.current_context() ipch = ctx.get_ipc_handle(devarr.gpu_data) @@ -276,16 +221,16 @@ def test_staged(self): self.assertEqual(ipch_recon.size, ipch.size) # Test on every CUDA devices - for device_num in range(len(cuda.gpus)): - args = (ipch, device_num, result_queue) - proc = mpctx.Process(target=staged_ipc_handle_test, args=args) - proc.start() - succ, out = result_queue.get() - proc.join(3) - if not succ: - self.fail(out) - else: - np.testing.assert_equal(arr, out) + ngpus = len(cuda.gpus) + futures = [ + self.exe.submit( + staged_ipc_handle_test, ipch, device_num, parent_pid=os.getpid() + ) + for device_num in range(ngpus) + ] + + for fut in concurrent.futures.as_completed(futures, timeout=3 * ngpus): + np.testing.assert_equal(arr, fut.result()) def test_ipc_array(self): for device_num in range(len(cuda.gpus)): @@ -295,17 +240,11 @@ def test_ipc_array(self): ipch = devarr.get_ipc_handle() # spawn new process for testing - ctx = mp.get_context("spawn") - result_queue = ctx.Queue() - args = (ipch, device_num, result_queue) - proc = ctx.Process(target=staged_ipc_array_test, args=args) - proc.start() - succ, out = result_queue.get() - proc.join(3) - if not succ: - self.fail(out) - else: - np.testing.assert_equal(arr, out) + fut = self.exe.submit( + staged_ipc_array_test, ipch, device_num, parent_pid=os.getpid() + ) + out = fut.result(timeout=3) + np.testing.assert_equal(arr, out) @windows_only