Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI: revamp parallel marker #2347

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 54 additions & 67 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def EVAL(exprs, *args):
return processed[0] if isinstance(exprs, str) else processed


def parallel(item):
def parallel(item, m):
"""
Run a test in parallel. Readapted from:

Expand All @@ -131,47 +131,44 @@ def parallel(item):
mpi_exec = 'mpiexec'
mpi_distro = sniff_mpi_distro(mpi_exec)

marker = item.get_closest_marker("parallel")
mode = as_tuple(marker.kwargs.get("mode", 2))
for m in mode:
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
else:
if len(m) == 2:
nprocs, scheme = m
else:
raise ValueError("Can't run test: unexpected mode `%s`" % m)

pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-s",
"-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
# Parse the `mode`
if isinstance(m, int):
nprocs = m
scheme = 'basic'
else:
if len(m) == 2:
nprocs, scheme = m
else:
call = [mpi_exec] + args
raise ValueError("Can't run test: unexpected mode `%s`" % m)

# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
return True
except:
return False
finally:
os.environ['DEVITO_MPI'] = '0'
pyversion = sys.executable
# Only spew tracebacks on rank 0.
# Run xfailing tests to ensure that errors are reported to calling process
if item.cls is not None:
testname = "%s::%s::%s" % (item.fspath, item.cls.__name__, item.name)
else:
testname = "%s::%s" % (item.fspath, item.name)
args = ["-n", "1", pyversion, "-m", "pytest", "--runxfail", "-q", testname]
if nprocs > 1:
args.extend([":", "-n", "%d" % (nprocs - 1), pyversion, "-m", "pytest",
"--runxfail", "--tb=no", "-q", testname])
# OpenMPI requires an explicit flag for oversubscription. We need it as some
# of the MPI tests will spawn lots of processes
if mpi_distro == 'OpenMPI':
call = [mpi_exec, '--oversubscribe', '--timeout', '300'] + args
else:
call = [mpi_exec] + args

# Tell the MPI ranks that they are running a parallel test
os.environ['DEVITO_MPI'] = scheme
try:
check_call(call)
res = True
except:
res = False
finally:
os.environ['DEVITO_MPI'] = '0'
return res


def pytest_configure(config):
Expand All @@ -182,55 +179,45 @@ def pytest_configure(config):
)


def pytest_runtest_setup(item):
partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass
if item.get_closest_marker("parallel"):
if MPI is None:
pytest.skip("mpi4py/MPI not installed")
else:
# Blow away function arg in "master" process, to ensure
# this test isn't run on only one process
dummy_test = lambda *args, **kwargs: True
# For pytest <7
if item.cls is not None:
attr = item.originalname or item.name
setattr(item.cls, attr, dummy_test)
else:
item.obj = dummy_test
# For pytest >= 7
setattr(item, '_obj', dummy_test)
def pytest_generate_tests(metafunc):
# Process custom parallel marker as a parametrize to avoid
# running a single test for all modes
if 'mode' in metafunc.fixturenames:
markers = metafunc.definition.iter_markers()
for marker in markers:
if marker.name == 'parallel':
mode = list(as_tuple(marker.kwargs.get('mode', 2)))
metafunc.parametrize("mode", mode)


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_call(item):
partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass

if item.get_closest_marker("parallel") and not partest:
# Spawn parallel processes to run test
passed = parallel(item)
if not passed:
pytest.fail(f"{item} failed in parallel execution")
outcome = parallel(item, item.funcargs['mode'])
if outcome:
pytest.skip(f"{item} success in parallel")
else:
pytest.skip(f"{item}t passed in parallel execution")
pytest.fail(f"{item} failed in parallel")
else:
outcome = yield


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
result = outcome.get_result()

partest = os.environ.get('DEVITO_MPI', 0)
try:
partest = int(partest)
except ValueError:
pass

if item.get_closest_marker("parallel") and not partest:
if call.when == 'call' and result.outcome == 'skipped':
result.outcome = 'passed'
Expand Down
2 changes: 1 addition & 1 deletion tests/test_autotuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_discarding_runs():


@pytest.mark.parallel(mode=[(2, 'diag'), (2, 'full')])
def test_at_w_mpi():
def test_at_w_mpi(mode):
"""Make sure autotuning works in presence of MPI. MPI ranks work
in isolation to determine the best block size, locally."""
grid = Grid(shape=(8, 8))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_bench(mode, problem, op):

@pytest.mark.parallel(mode=2)
@switchconfig(profiling='advanced')
def test_run_mpi():
def test_run_mpi(mode):
"""
Test the `run` mode over MPI, with all key arguments used.
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/test_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_assign_subsampled_timefunction(self):
assert np.all(f.data == 1)

@pytest.mark.parallel(mode=4)
def test_assign_parallel(self):
def test_assign_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -174,7 +174,7 @@ def test_gs_2d_float(self, sigma):
assert np.amax(np.abs(sp_smoothed - np.array(dv_smoothed))) <= 1e-5

@pytest.mark.parallel(mode=[(4, 'full')])
def test_gs_parallel(self):
def test_gs_parallel(self, mode):
a = np.arange(64).reshape((8, 8))
grid = Grid(shape=a.shape)

Expand Down Expand Up @@ -236,7 +236,7 @@ def test_nbl_zero(self):
assert np.all(a[:] - np.array(f.data[:]) == 0)

@pytest.mark.parallel(mode=4)
def test_if_parallel(self):
def test_if_parallel(self, mode):
a = np.arange(36).reshape((6, 6))
grid = Grid(shape=(18, 18))
x, y = grid.dimensions
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_if_halo(self, ndim, nbl):

@pytest.mark.parametrize('nbl', [0, 2])
@pytest.mark.parallel(mode=4)
def test_if_halo_mpi(self, nbl):
def test_if_halo_mpi(self, nbl, mode):
"""
Test that FD halo is padded as well.
"""
Expand Down
Loading
Loading