Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions numba_cuda/numba/cuda/tests/cudadrv/test_init.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD-2-Clause

import concurrent.futures
import multiprocessing as mp
import os

Expand All @@ -20,88 +21,83 @@ def cuInit_raising(arg):
# not assigned until we attempt to initialize - mock.patch.object cannot locate
# the non-existent original method, and so fails. Instead we patch
# driver.cuInit with our raising version prior to any attempt to initialize.
def cuInit_raising_test(result_queue):
def cuInit_raising_test():
driver.cuInit = cuInit_raising

success = False
msg = None

try:
# A CUDA operation that forces initialization of the device
cuda.device_array(1)
except CudaSupportError as e:
success = True
msg = e.msg
else:
success = False
msg = None

result_queue.put((success, msg))
return success, msg


# Similar to cuInit_raising_test above, but for testing that the string
# returned by cuda_error() is as expected.
def initialization_error_test(result_queue):
def initialization_error_test():
driver.cuInit = cuInit_raising

success = False
msg = None

try:
# A CUDA operation that forces initialization of the device
cuda.device_array(1)
except CudaSupportError:
success = True
else:
success = False

msg = cuda.cuda_error()
result_queue.put((success, msg))
return success, cuda.cuda_error()


# For testing the path where Driver.__init__() catches a CudaSupportError
def cuda_disabled_test(result_queue):
success = False
msg = None

def cuda_disabled_test():
try:
# A CUDA operation that forces initialization of the device
cuda.device_array(1)
except CudaSupportError as e:
success = True
msg = e.msg
else:
success = False
msg = None

result_queue.put((success, msg))
return success, msg


# Similar to cuda_disabled_test, but checks cuda.cuda_error() instead of the
# exception raised on initialization
def cuda_disabled_error_test(result_queue):
success = False
msg = None

def cuda_disabled_error_test():
try:
# A CUDA operation that forces initialization of the device
cuda.device_array(1)
except CudaSupportError:
success = True
else:
success = False

msg = cuda.cuda_error()
result_queue.put((success, msg))
return success, cuda.cuda_error()


@skip_on_cudasim("CUDA Simulator does not initialize driver")
class TestInit(CUDATestCase):
def _test_init_failure(self, target, expected):
# Run the initialization failure test in a separate subprocess
ctx = mp.get_context("spawn")
result_queue = ctx.Queue()
proc = ctx.Process(target=target, args=(result_queue,))
proc.start()
proc.join(30) # should complete within 30s
success, msg = result_queue.get()
with concurrent.futures.ProcessPoolExecutor(
mp_context=mp.get_context("spawn")
) as exe:
# should complete within 30s
success, msg = exe.submit(target).result(timeout=30)

# Ensure the child process raised an exception during initialization
# before checking the message
if not success:
self.fail("CudaSupportError not raised")
assert "CudaSupportError not raised" in msg

self.assertIn(expected, msg)
assert expected in msg

def test_init_failure_raising(self):
expected = "Error at driver init: CUDA_ERROR_UNKNOWN (999)"
Expand Down
38 changes: 13 additions & 25 deletions numba_cuda/numba/cuda/tests/cudadrv/test_runtime.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD-2-Clause

import concurrent.futures
import multiprocessing
import os
from numba.cuda.testing import unittest


def set_visible_devices_and_check(q):
try:
from numba import cuda
import os
def set_visible_devices_and_check():
from numba import cuda
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
q.put(len(cuda.gpus.lst))
except: # noqa: E722
# Sentinel value for error executing test code
q.put(-1)
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
return len(cuda.gpus.lst)


class TestVisibleDevices(unittest.TestCase):
Expand All @@ -38,22 +35,13 @@ def test_visible_devices_set_after_import(self):
msg = "Cannot test when CUDA_VISIBLE_DEVICES already set"
self.skipTest(msg)

ctx = multiprocessing.get_context("spawn")
q = ctx.Queue()
p = ctx.Process(target=set_visible_devices_and_check, args=(q,))
p.start()
try:
visible_gpu_count = q.get()
finally:
p.join()

# Make an obvious distinction between an error running the test code
# and an incorrect number of GPUs in the list
msg = "Error running set_visible_devices_and_check"
self.assertNotEqual(visible_gpu_count, -1, msg=msg)

# The actual check that we see only one GPU
self.assertEqual(visible_gpu_count, 1)
with concurrent.futures.ProcessPoolExecutor(
mp_context=multiprocessing.get_context("spawn")
) as exe:
future = exe.submit(set_visible_devices_and_check)

visible_gpu_count = future.result()
assert visible_gpu_count == 1


if __name__ == "__main__":
Expand Down
41 changes: 15 additions & 26 deletions numba_cuda/numba/cuda/tests/cudapy/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,36 @@

import os
import multiprocessing as mp
import pytest
import concurrent.futures

import numpy as np

from numba import cuda
from numba.cuda.testing import skip_on_cudasim, CUDATestCase
import unittest

has_mp_get_context = hasattr(mp, "get_context")
is_unix = os.name == "posix"


def fork_test(q):
from numba.cuda.cudadrv.error import CudaDriverError

try:
cuda.to_device(np.arange(1))
except CudaDriverError as e:
q.put(e)
else:
q.put(None)


@skip_on_cudasim("disabled for cudasim")
class TestMultiprocessing(CUDATestCase):
@unittest.skipUnless(has_mp_get_context, "requires mp.get_context")
@unittest.skipUnless(is_unix, "requires Unix")
@unittest.skipUnless(hasattr(mp, "get_context"), "requires mp.get_context")
@unittest.skipUnless(os.name == "posix", "requires Unix")
def test_fork(self):
"""
Test fork detection.
"""
from numba.cuda.cudadrv.error import CudaDriverError

cuda.current_context() # force cuda initialize
# fork in process that also uses CUDA
ctx = mp.get_context("fork")
q = ctx.Queue()
proc = ctx.Process(target=fork_test, args=[q])
proc.start()
exc = q.get()
proc.join()
# there should be an exception raised in the child process
self.assertIsNotNone(exc)
self.assertIn("CUDA initialized before forking", str(exc))
with concurrent.futures.ProcessPoolExecutor(
mp_context=mp.get_context("fork")
) as exe:
future = exe.submit(cuda.to_device, np.arange(1))

with pytest.raises(
CudaDriverError, match="CUDA initialized before forking"
):
future.result()


if __name__ == "__main__":
Expand Down
48 changes: 12 additions & 36 deletions numba_cuda/numba/cuda/tests/cudapy/test_multithreads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD-2-Clause

import traceback
import threading
import multiprocessing
import numpy as np
Expand All @@ -13,12 +12,7 @@
)
import unittest

try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
has_concurrent_futures = False
else:
has_concurrent_futures = True
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


has_mp_get_context = hasattr(multiprocessing, "get_context")
Expand All @@ -41,52 +35,34 @@ def use_foo(x):
np.testing.assert_equal(ary, expected)


def spawn_process_entry(q):
try:
check_concurrent_compiling()
# Catch anything that goes wrong in the threads
except: # noqa: E722
msg = traceback.format_exc()
q.put("\n".join(["", "=" * 80, msg]))
else:
q.put(None)


@skip_under_cuda_memcheck("Hangs cuda-memcheck")
@skip_on_cudasim("disabled for cudasim")
class TestMultiThreadCompiling(CUDATestCase):
@unittest.skipIf(not has_concurrent_futures, "no concurrent.futures")
def test_concurrent_compiling(self):
check_concurrent_compiling()

@unittest.skipIf(not has_mp_get_context, "no multiprocessing.get_context")
def test_spawn_concurrent_compilation(self):
# force CUDA context init
cuda.get_current_device()
# use "spawn" to avoid inheriting the CUDA context
ctx = multiprocessing.get_context("spawn")

q = ctx.Queue()
p = ctx.Process(target=spawn_process_entry, args=(q,))
p.start()
try:
err = q.get()
finally:
p.join()
if err is not None:
raise AssertionError(err)
self.assertEqual(p.exitcode, 0, "test failed in child process")

with ProcessPoolExecutor(
# use "spawn" to avoid inheriting the CUDA context
mp_context=multiprocessing.get_context("spawn")
) as exe:
future = exe.submit(check_concurrent_compiling)
future.result()

def test_invalid_context_error_with_d2h(self):
def d2h(arr, out):
out[:] = arr.copy_to_host()

arr = np.arange(1, 4)
out = np.zeros_like(arr)
darr = cuda.to_device(arr)
th = threading.Thread(target=d2h, args=[darr, out])
th.start()
th.join()

with ThreadPoolExecutor() as exe:
exe.submit(d2h, cuda.to_device(arr), out)

np.testing.assert_equal(arr, out)

def test_invalid_context_error_with_d2d(self):
Expand Down
Loading