Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpi: Enhance flexibility for custom topologies #2134

Merged
merged 9 commits into from
Jun 15, 2023
6 changes: 3 additions & 3 deletions devito/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class Data(np.ndarray):
-----
NumPy array subclassing is described at: ::

https://docs.scipy.org/doc/numpy-1.13.0/user/basics.subclassing.html
https://numpy.org/doc/stable/user/basics.subclassing.html

Any view or copy created from ``self``, for instance via a slice operation
or a universal function ("ufunc" in NumPy jargon), will still be of type
Data.
`Data`.
"""

def __new__(cls, shape, dtype, decomposition=None, modulo=None, allocator=ALLOC_FLAT,
Expand Down Expand Up @@ -224,7 +224,7 @@ def __getitem__(self, glb_idx, comm_type, gather_rank=None):
glb_shape = self._distributor.glb_shape
retval = np.zeros(glb_shape, dtype=self.dtype.type)
start, stop, step = 0, 0, 1
for i, s in enumerate(sendcounts):
for i, _ in enumerate(sendcounts):
if i > 0:
start += sendcounts[i-1]
stop += sendcounts[i]
Expand Down
101 changes: 70 additions & 31 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
from ctypes import c_int, c_void_p, sizeof
from itertools import groupby, product
from math import ceil
from abc import ABC, abstractmethod
from math import ceil, pow
from sympy import factorint

import atexit

from cached_property import cached_property
Expand Down Expand Up @@ -204,8 +206,7 @@ def __init__(self, shape, dimensions, input_comm=None, topology=None):
# guarantee that 9 ranks are arranged into a 3x3 grid when shape=(9, 9))
self._topology = compute_dims(self._input_comm.size, len(shape))
else:
# A custom topology may contain integers or the wildcard '*', which
# implies `nprocs // nstars`
# A custom topology may contain integers or the wildcard '*'
topology = CustomTopology(topology, self._input_comm)

self._topology = topology
Expand Down Expand Up @@ -566,51 +567,89 @@ def _arg_values(self, *args, **kwargs):
class CustomTopology(tuple):

"""
A CustomTopology is a mechanism to describe parametric domain decompositions.
The CustomTopology class provides a mechanism to describe parametric domain
decompositions. It allows users to specify how the dimensions of a domain are
decomposed into chunks based on certain parameters.

Examples
--------
Assuming a domain consisting of three distributed Dimensions x, y, and z, and
an MPI communicator comprising N processes, a CustomTopology might be:
For example, let's consider a domain with three distributed dimensions: x, y, and z,
and an MPI communicator with N processes. Here are a few examples of CustomTopology:

With N known, say N=4:

* `(1, 1, 4)`: the z Dimension is decomposed into 4 chunks
* `(2, 1, 2)`: the x Dimension is decomposed into 2 chunks; the z Dimension
* `(2, 1, 2)`: the x Dimension is decomposed into 2 chunks and the z Dimension
is decomposed into 2 chunks

With N unknown:

* `(1, '*', 1)`: the wildcard `'*'` tells the runtime to decompose the y
* `(1, '*', 1)`: the wildcard `'*'` indicates that the runtime should decompose the y
Dimension into N chunks
* `('*', '*', 1)`: the wildcard `'*'` tells the runtime to decompose both the
x and y Dimensions into N / 2 chunks respectively.
* `('*', '*', 1)`: the wildcard `'*'` indicates that the runtime should decompose both
the x and y Dimensions in `nstars` factors of N, prioritizing
the outermost dimension

Assuming that the number of ranks `N` cannot evenly be decomposed to the requested
stars=6 we decompose as evenly as possible by prioritising the outermost dimension

Raises
------
N must evenly divide the number of `'*'`, otherwise a ValueError exception
is raised.
If the wildcard `'*'` is used, then the CustomTopology can only contain either
`'*'` or 1's, otherwise a ValueError exception is raised.
For N=3
* `('*', '*', 1)` gives: (3, 1, 1)
* `('*', 1, '*')` gives: (3, 1, 1)
* `(1, '*', '*')` gives: (1, 3, 1)

For N=6
* `('*', '*', 1)` gives: (3, 2, 1)
* `('*', 1, '*')` gives: (3, 1, 2)
* `(1, '*', '*')` gives: (1, 3, 2)

For N=8
* `('*', '*', '*')` gives: (2, 2, 2)
* `('*', '*', 1)` gives: (4, 2, 1)
* `('*', 1, '*')` gives: (4, 1, 2)
* `(1, '*', '*')` gives: (1, 4, 2)

Notes
-----
Users shouldn't use this class directly. It's up to the Devito runtime to
instantiate it based on the user input.
Users should not directly use the CustomTopology class. It is instantiated
by the Devito runtime based on user input.
"""

def __new__(cls, items, input_comm):
nstars = len([i for i in items if i == '*'])
if nstars > 0:
if input_comm.size % nstars != 0:
raise ValueError("Invalid `topology` for given nprocs")
if any(i not in ('*', 1) for i in items):
raise ValueError("Custom topology must be only 1 or *")

v = input_comm.size // nstars
processed = [i if i == 1 else v for i in items]
else:
# Keep track of nstars and already defined decompositions
nstars = items.count('*')

# If no stars exist we are ready
if nstars == 0:
processed = items
else:
# Init decomposition list and track star positions
processed = [1] * len(items)
star_pos = []
for i, item in enumerate(items):
if isinstance(item, int):
processed[i] = item
else:
star_pos.append(i)

# Compute the remaining procs to be allocated
alloc_procs = np.prod([i for i in items if i != '*'])
rem_procs = int(input_comm.size // alloc_procs)

# List of all factors of rem_procs in decreasing order
factors = factorint(rem_procs)
vals = [k for (k, v) in factors.items() for _ in range(v)][::-1]

# Split in number of stars
split = np.array_split(vals, nstars)

# Reduce
star_vals = [int(np.prod(s)) for s in split]

# Apply computed star values to the processed
for index, value in zip(star_pos, star_vals):
processed[index] = value

# Final check that topology matches the communicator size
assert np.prod(processed) == input_comm.size

obj = super().__new__(cls, processed)
obj.logical = items
Expand Down
1 change: 0 additions & 1 deletion examples/seismic/test_seismic_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def not_bcs(bc):
@pytest.mark.parametrize('nbl, bcs', [
(20, ("mask", 1)), (0, ("mask", 1)),
(20, ("damp", 0)), (0, ("damp", 0))

])
def test_damp(nbl, bcs):
shape = (21, 21)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
retrieve_iteration_tree)
from devito.mpi import MPI
from devito.mpi.routines import HaloUpdateCall, HaloUpdateList, MPICall
from devito.mpi.distributed import CustomTopology
from devito.tools import Bunch
from examples.seismic.acoustic import acoustic_setup

pytestmark = skipif(['nompi'], whole_module=True)
Expand Down Expand Up @@ -180,6 +182,48 @@ def test_custom_topology(self):
assert f2.shape == expected[distributor.myrank]
assert f2.size_global == f.size_global

@pytest.mark.parametrize('comm_size, topology, dist_topology', [
(2, (1, '*'), (1, 2)),
(2, ('*', '*'), (2, 1)),
(1, (1, '*', '*'), (1, 1, 1)),
(2, (1, '*', '*'), (1, 2, 1)),
(2, (2, '*', '*'), (2, 1, 1)),
(3, (1, '*', '*'), (1, 3, 1)),
(3, ('*', '*', 1), (3, 1, 1)),
(4, (2, '*', '*'), (2, 2, 1)),
(4, ('*', '*', 2), (2, 1, 2)),
(6, ('*', '*', 1), (3, 2, 1)),
(6, (1, '*', '*'), (1, 3, 2)),
(6, ('*', '*', '*'), (3, 2, 1)),
(12, ('*', '*', '*'), (3, 2, 2)),
(12, ('*', 3, '*'), (2, 3, 2)),
(18, ('*', '*', '*'), (3, 3, 2)),
(18, ('*', '*', 9), (2, 1, 9)),
(18, ('*', '*', 3), (3, 2, 3)),
(24, ('*', '*', '*'), (6, 2, 2)),
(32, ('*', '*', '*'), (4, 4, 2)),
(8, ('*', 1, '*'), (4, 1, 2)),
(8, ('*', '*', 1), (4, 2, 1)),
(8, ('*', '*', '*'), (2, 2, 2)),
(9, ('*', '*', '*'), (3, 3, 1)),
(11, (1, '*', '*'), (1, 11, 1)),
(22, ('*', '*', '*'), (11, 2, 1)),
(16, ('*', 1, '*'), (4, 1, 4)),
(32, ('*', '*', 1), (8, 4, 1)),
(64, ('*', '*', 1), (8, 8, 1)),
(64, ('*', 2, 4), (8, 2, 4)),
(128, ('*', '*', 1), (16, 8, 1)),
(231, ('*', '*', '*'), (11, 7, 3)),
(256, (1, '*', '*'), (1, 16, 16)),
(256, ('*', '*', '*'), (8, 8, 4)),
(256, ('*', '*', 2), (16, 8, 2)),
(256, ('*', 32, 2), (4, 32, 2)),
])
def test_custom_topology_v2(self, comm_size, topology, dist_topology):
dummy_comm = Bunch(size=comm_size)
custom_topology = CustomTopology(topology, dummy_comm)
assert custom_topology == dist_topology


class TestFunction(object):

Expand Down