Skip to content

Commit

Permalink
Merge pull request #2347 from devitocodes/mpi-conftest
Browse files Browse the repository at this point in the history
CI: revamp parallel marker
  • Loading branch information
FabioLuporini authored Apr 8, 2024
2 parents 981661f + 65ee2e1 commit c836e03
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 203 deletions.
121 changes: 54 additions & 67 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def EVAL(exprs, *args):
return processed[0] if isinstance(exprs, str) else processed


def parallel(item):
def parallel(item, m):
"""
Run a test in parallel. Readapted from:
Expand All @@ -131,47 +131,44 @@ def parallel(item):
mpi_exec = 'mpiexec'
mpi_distro = sniff_mpi_distro(mpi_exec)

marker = item.get_closest_marker("parallel")
mode = as_tuple(marker.kwargs.get("mode", 2))
for m in mode:
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
else:
if len(m) == 2:
nprocs, scheme = m
else:
raise ValueError("Can't run test: unexpected mode `%s`" % m)

pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-s",
"-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
else:
if len(m) == 2:
nprocs, scheme = m
else:
call = [mpi_exec] + args
raise ValueError("Can't run test: unexpected mode `%s`" % m)

# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
return True
except:
return False
finally:
os.environ['DEVITO_MPI'] = '0'
pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
else:
call = [mpi_exec] + args

# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
res = True
except:
res = False
finally:
os.environ['DEVITO_MPI'] = '0'
return res


def pytest_configure(config):
Expand All @@ -182,55 +179,45 @@ def pytest_configure(config):
)


def pytest_runtest_setup(item):
partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass
if item.get_closest_marker("parallel"):
if MPI is None:
pytest.skip("mpi4py/MPI not installed")
else:
# Blow away function arg in "master" process, to ensure
# this test isn't run on only one process
dummy_test = lambda *args, **kwargs: True
# For pytest <7
if item.cls is not None:
attr = item.originalname or item.name
setattr(item.cls, attr, dummy_test)
else:
item.obj = dummy_test
# For pytest >= 7
setattr(item, '_obj', dummy_test)
def pytest_generate_tests(metafunc):
# Process custom parallel marker as a parametrize to avoid
# running a single test for all modes
if 'mode' in metafunc.fixturenames:
markers = metafunc.definition.iter_markers()
for marker in markers:
if marker.name == 'parallel':
mode = list(as_tuple(marker.kwargs.get('mode', 2)))
metafunc.parametrize("mode", mode)


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_call(item):
partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass

if item.get_closest_marker("parallel") and not partest:
# Spawn parallel processes to run test
passed = parallel(item)
if not passed:
pytest.fail(f"{item} failed in parallel execution")
outcome = parallel(item, item.funcargs['mode'])
if outcome:
pytest.skip(f"{item} success in parallel")
else:
pytest.skip(f"{item}t passed in parallel execution")
pytest.fail(f"{item} failed in parallel")
else:
outcome = yield


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
result = outcome.get_result()

partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass

if item.get_closest_marker("parallel") and not partest:
if call.when == 'call' and result.outcome == 'skipped':
result.outcome = 'passed'
Expand Down
2 changes: 1 addition & 1 deletion tests/test_autotuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_discarding_runs():


@pytest.mark.parallel(mode=[(2, 'diag'), (2, 'full')])
def test_at_w_mpi():
def test_at_w_mpi(mode):
"""Make sure autotuning works in presence of MPI. MPI ranks work
in isolation to determine the best block size, locally."""
grid = Grid(shape=(8, 8))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_bench(mode, problem, op):

@pytest.mark.parallel(mode=2)
@switchconfig(profiling='advanced')
def test_run_mpi():
def test_run_mpi(mode):
"""
Test the `run` mode over MPI, with all key arguments used.
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/test_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_assign_subsampled_timefunction(self):
assert np.all(f.data == 1)

@pytest.mark.parallel(mode=4)
def test_assign_parallel(self):
def test_assign_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -174,7 +174,7 @@ def test_gs_2d_float(self, sigma):
assert np.amax(np.abs(sp_smoothed - np.array(dv_smoothed))) <= 1e-5

@pytest.mark.parallel(mode=[(4, 'full')])
def test_gs_parallel(self):
def test_gs_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -236,7 +236,7 @@ def test_nbl_zero(self):
assert np.all(a[:] - np.array(f.data[:]) == 0)

@pytest.mark.parallel(mode=4)
def test_if_parallel(self):
def test_if_parallel(self, mode):
a = np.arange(36).reshape((6, 6))
grid = Grid(shape=(18, 18))
x, y = grid.dimensions
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_if_halo(self, ndim, nbl):

@pytest.mark.parametrize('nbl', [0, 2])
@pytest.mark.parallel(mode=4)
def test_if_halo_mpi(self, nbl):
def test_if_halo_mpi(self, nbl, mode):
"""
Test that FD halo is padded as well.
"""
Expand Down
Loading

0 comments on commit c836e03

Please sign in to comment.