Skip to content

Commit

Permalink
CI: revamp parallel marker
Browse files Browse the repository at this point in the history
  • Loading branch information
mloubout committed Apr 5, 2024
1 parent cb24675 commit dfbb848
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 174 deletions.
89 changes: 49 additions & 40 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def EVAL(exprs, *args):
return processed[0] if isinstance(exprs, str) else processed


def parallel(item):
def parallel(item, m):
"""
Run a test in parallel. Readapted from:
Expand All @@ -131,47 +131,45 @@ def parallel(item):
mpi_exec = 'mpiexec'
mpi_distro = sniff_mpi_distro(mpi_exec)

marker = item.get_closest_marker("parallel")
mode = as_tuple(marker.kwargs.get("mode", 2))
for m in mode:
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
else:
if len(m) == 2:
nprocs, scheme = m
else:
if len(m) == 2:
nprocs, scheme = m
else:
raise ValueError("Can't run test: unexpected mode `%s`" % m)
raise ValueError("Can't run test: unexpected mode `%s`" % m)

pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-s",
"-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
else:
call = [mpi_exec] + args
pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-s",
"-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest", "-s",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
else:
call = [mpi_exec] + args

# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
return True
except:
return False
finally:
os.environ['DEVITO_MPI'] = '0'
# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
res = True
except:
res = False
finally:
os.environ['DEVITO_MPI'] = '0'
return res


def pytest_configure(config):
Expand Down Expand Up @@ -205,15 +203,26 @@ def pytest_runtest_setup(item):
setattr(item, '_obj', dummy_test)


def pytest_generate_tests(metafunc):
# Process custom parallel marker as a parametrize to avoid
# running a single test for all modes
if 'mode' in metafunc.fixturenames:
markers = metafunc.definition.iter_markers()
for marker in markers:
if marker.name == 'parallel':
metafunc.parametrize("mode", [marker.kwargs.get('mode', 2)])


def pytest_runtest_call(item):
partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass

if item.get_closest_marker("parallel") and not partest:
# Spawn parallel processes to run test
passed = parallel(item)
passed = parallel(item, item.funcargs['mode'])
if not passed:
pytest.fail(f"{item} failed in parallel execution")
else:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_autotuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_discarding_runs():


@pytest.mark.parallel(mode=[(2, 'diag'), (2, 'full')])
def test_at_w_mpi():
def test_at_w_mpi(mode):
"""Make sure autotuning works in presence of MPI. MPI ranks work
in isolation to determine the best block size, locally."""
grid = Grid(shape=(8, 8))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_bench(mode, problem, op):

@pytest.mark.parallel(mode=2)
@switchconfig(profiling='advanced')
def test_run_mpi():
def test_run_mpi(mode):
"""
Test the `run` mode over MPI, with all key arguments used.
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/test_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_assign_subsampled_timefunction(self):
assert np.all(f.data == 1)

@pytest.mark.parallel(mode=4)
def test_assign_parallel(self):
def test_assign_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -174,7 +174,7 @@ def test_gs_2d_float(self, sigma):
assert np.amax(np.abs(sp_smoothed - np.array(dv_smoothed))) <= 1e-5

@pytest.mark.parallel(mode=[(4, 'full')])
def test_gs_parallel(self):
def test_gs_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -236,7 +236,7 @@ def test_nbl_zero(self):
assert np.all(a[:] - np.array(f.data[:]) == 0)

@pytest.mark.parallel(mode=4)
def test_if_parallel(self):
def test_if_parallel(self, mode):
a = np.arange(36).reshape((6, 6))
grid = Grid(shape=(18, 18))
x, y = grid.dimensions
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_if_halo(self, ndim, nbl):

@pytest.mark.parametrize('nbl', [0, 2])
@pytest.mark.parallel(mode=4)
def test_if_halo_mpi(self, nbl):
def test_if_halo_mpi(self, nbl, mode):
"""
Test that FD halo is padded as well.
"""
Expand Down
44 changes: 22 additions & 22 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class TestDataDistributed(object):
"""

@pytest.mark.parallel(mode=4)
def test_localviews(self):
def test_localviews(self, mode):
grid = Grid(shape=(4, 4))
x, y = grid.dimensions
glb_pos_map = grid.distributor.glb_pos_map
Expand Down Expand Up @@ -520,7 +520,7 @@ def test_localviews(self):
assert np.all(u.data_ro_with_halo._local[2] == 0.)

@pytest.mark.parallel(mode=4)
def test_trivial_insertion(self):
def test_trivial_insertion(self, mode):
grid = Grid(shape=(4, 4))
u = Function(name='u', grid=grid, space_order=0)
v = Function(name='v', grid=grid, space_order=1)
Expand All @@ -536,7 +536,7 @@ def test_trivial_insertion(self):
assert np.all(v.data_with_halo._local == 1.)

@pytest.mark.parallel(mode=4)
def test_indexing(self):
def test_indexing(self, mode):
grid = Grid(shape=(4, 4))
x, y = grid.dimensions
glb_pos_map = grid.distributor.glb_pos_map
Expand Down Expand Up @@ -567,7 +567,7 @@ def test_indexing(self):
assert np.all(u.data[:, 2] == [myrank, myrank])

@pytest.mark.parallel(mode=4)
def test_slicing(self):
def test_slicing(self, mode):
grid = Grid(shape=(4, 4))
x, y = grid.dimensions
glb_pos_map = grid.distributor.glb_pos_map
Expand All @@ -594,7 +594,7 @@ def test_slicing(self):
assert u.data[:2, 2:].size == u.data[2:, :2].size == u.data[:2, :2].size == 0

@pytest.mark.parallel(mode=4)
def test_slicing_ns(self):
def test_slicing_ns(self, mode):
# Test slicing with a negative step
grid = Grid(shape=(4, 4))
x, y = grid.dimensions
Expand All @@ -619,7 +619,7 @@ def test_slicing_ns(self):
assert np.all(u.data == [[5, 4], [1, 0]])

@pytest.mark.parallel(mode=4)
def test_getitem(self):
def test_getitem(self, mode):
# __getitem__ mpi slicing tests:
grid = Grid(shape=(8, 8))
x, y = grid.dimensions
Expand Down Expand Up @@ -697,7 +697,7 @@ def test_getitem(self):
assert np.all(result4 == [[28, 27, 26]])

@pytest.mark.parallel(mode=4)
def test_big_steps(self):
def test_big_steps(self, mode):
# Test slicing with a step size > 1
grid = Grid(shape=(8, 8))
x, y = grid.dimensions
Expand Down Expand Up @@ -749,7 +749,7 @@ def test_big_steps(self):
assert np.all(r3 == [[0]])

@pytest.mark.parallel(mode=4)
def test_setitem(self):
def test_setitem(self, mode):
# __setitem__ mpi slicing tests
grid = Grid(shape=(12, 12))
x, y = grid.dimensions
Expand Down Expand Up @@ -810,7 +810,7 @@ def test_setitem(self):
[0, 0, 0, 0, 0, 0]])

@pytest.mark.parallel(mode=4)
def test_hd_slicing(self):
def test_hd_slicing(self, mode):
# Test higher dimension slices
grid = Grid(shape=(4, 4, 4))
x, y, z = grid.dimensions
Expand Down Expand Up @@ -889,7 +889,7 @@ def test_hd_slicing(self):
[63]])

@pytest.mark.parallel(mode=4)
def test_niche_slicing(self):
def test_niche_slicing(self, mode):
grid0 = Grid(shape=(8, 8))
x0, y0 = grid0.dimensions
glb_pos_map0 = grid0.distributor.glb_pos_map
Expand Down Expand Up @@ -1029,7 +1029,7 @@ def test_niche_slicing(self):
((8, 8, 8), (slice(None, None, 1), 5, slice(None, None, 1)),
(slice(None, None, 1), 1, slice(None, None, 1)),
(slice(None, None, 1), 7, slice(None, None, 1)))])
def test_niche_slicing2(self, shape, slice0, slice1, slice2):
def test_niche_slicing2(self, shape, slice0, slice1, slice2, mode):
grid = Grid(shape=shape)
f = Function(name='f', grid=grid)
f.data[:] = 1
Expand Down Expand Up @@ -1063,7 +1063,7 @@ def test_empty_slicing(self):
assert(g.data[1:1, 0:0, 1:1].shape == (0, 0, 0))

@pytest.mark.parallel(mode=4)
def test_neg_start_stop(self):
def test_neg_start_stop(self, mode):
grid0 = Grid(shape=(8, 8))
f = Function(name='f', grid=grid0, space_order=0, dtype=np.int32)
dat = np.arange(64, dtype=np.int32)
Expand Down Expand Up @@ -1094,7 +1094,7 @@ def test_neg_start_stop(self):
assert np.count_nonzero(h.data[:]) == 0

@pytest.mark.parallel(mode=4)
def test_indexing_in_views(self):
def test_indexing_in_views(self, mode):
grid = Grid(shape=(4, 4))
x, y = grid.dimensions
glb_pos_map = grid.distributor.glb_pos_map
Expand Down Expand Up @@ -1158,7 +1158,7 @@ def test_indexing_in_views(self):
assert view2.size == 0

@pytest.mark.parallel(mode=4)
def test_from_replicated_to_distributed(self):
def test_from_replicated_to_distributed(self, mode):
shape = (4, 4)
grid = Grid(shape=shape)
x, y = grid.dimensions
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def test_from_replicated_to_distributed(self):
assert False

@pytest.mark.parallel(mode=4)
def test_misc_setup(self):
def test_misc_setup(self, mode):
"""Test setup of Functions with mixed distributed/replicated Dimensions."""
grid = Grid(shape=(4, 4))
_, y = grid.dimensions
Expand Down Expand Up @@ -1248,7 +1248,7 @@ def test_misc_setup(self):
assert True

@pytest.mark.parallel(mode=4)
def test_misc_data(self):
def test_misc_data(self, mode):
"""
Test data insertion/indexing for Functions with mixed
distributed/replicated Dimensions.
Expand Down Expand Up @@ -1294,7 +1294,7 @@ def test_misc_data(self):
(slice(None, None, -1), slice(0, 1, 1), slice(None, None, -1)),
(0, slice(None, None, -1), slice(None, None, -1)),
(slice(0, 1, 1), slice(None, None, -1), slice(None, None, -1))])
def test_inversions(self, gslice):
def test_inversions(self, gslice, mode):
""" Test index flipping along different axes."""
nx = 8
ny = 8
Expand Down Expand Up @@ -1337,7 +1337,7 @@ def test_inversions(self, gslice):
assert res.shape == vdat[tuple(sl)].shape

@pytest.mark.parallel(mode=4)
def test_setitem_shorthands(self):
def test_setitem_shorthands(self, mode):
# Test setitem with various slicing shorthands
nx = 8
ny = 8
Expand Down Expand Up @@ -1387,7 +1387,7 @@ class TestDataGather(object):

@pytest.mark.parallel(mode=4)
@pytest.mark.parametrize('rank', [0, 1, 2, 3])
def test_simple_gather(self, rank):
def test_simple_gather(self, rank, mode):
""" Test a simple gather on various ranks."""
grid = Grid(shape=(10, 10), extent=(9, 9))
f = Function(name='f', grid=grid, dtype=np.int32)
Expand All @@ -1408,7 +1408,7 @@ def test_simple_gather(self, rank):
(None, None, -2),
(1, 8, 3),
((0, 4), None, (2, 1))])
def test_sliced_gather_2D(self, start, stop, step):
def test_sliced_gather_2D(self, start, stop, step, mode):
""" Test gather for various 2D slices."""
grid = Grid(shape=(10, 10), extent=(9, 9))
f = Function(name='f', grid=grid, dtype=np.int32)
Expand Down Expand Up @@ -1442,7 +1442,7 @@ def test_sliced_gather_2D(self, start, stop, step):
(None, None, -2),
(1, 8, 3),
((0, 4, 4), None, (2, 1, 1))])
def test_sliced_gather_3D(self, start, stop, step):
def test_sliced_gather_3D(self, start, stop, step, mode):
""" Test gather for various 3D slices."""
grid = Grid(shape=(10, 10, 10), extent=(9, 9, 9))
f = Function(name='f', grid=grid, dtype=np.int32)
Expand All @@ -1469,7 +1469,7 @@ def test_sliced_gather_3D(self, start, stop, step):
assert ans == np.array(None)

@pytest.mark.parallel(mode=[4, 6])
def test_gather_time_function(self):
def test_gather_time_function(self, mode):
""" Test gathering of TimeFunction objects. """
grid = Grid(shape=(11, 11))
f = TimeFunction(name='f', grid=grid, save=11)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dle.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_cache_blocking_structure_subdims():


@pytest.mark.parallel(mode=[(1, 'full')]) # Shortcut to put loops in nested efuncs
def test_cache_blocking_structure_distributed():
def test_cache_blocking_structure_distributed(mode):
"""
Test cache blocking in multiple nested elemental functions.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2801,7 +2801,7 @@ def test_fullopt(self):

@switchconfig(profiling='advanced')
@pytest.mark.parallel(mode=[(1, 'full')])
def test_fullopt_w_mpi(self):
def test_fullopt_w_mpi(self, mode):
tti_noopt = self.tti_operator(opt=None)
rec0, u0, v0, _ = tti_noopt.forward()
tti_agg = self.tti_operator(opt='advanced')
Expand Down
Loading

0 comments on commit dfbb848

Please sign in to comment.