Skip to content

Commit

Permalink
mpi: patch missing dimensions for halospot
Browse files Browse the repository at this point in the history
  • Loading branch information
mloubout committed Nov 9, 2023
1 parent ef3b03c commit 1d87f69
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 21 deletions.
5 changes: 4 additions & 1 deletion devito/ir/iet/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def iet_build(stree):
nsections += 1

elif i.is_Halo:
body = HaloSpot(queues.pop(i), i.halo_scheme)
try:
body = HaloSpot(queues.pop(i), i.halo_scheme)
except KeyError:
body = HaloSpot(None, i.halo_scheme)

elif i.is_Sync:
body = SyncSpot(i.sync_ops, body=queues.pop(i, None))
Expand Down
47 changes: 42 additions & 5 deletions devito/ir/stree/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,17 @@ def stree_build(clusters, profiler=None, **kwargs):
tip = augment_whole_subtree(c, tip, mapper, it)

# Attach NodeHalo if necessary
for it, v in mapper.items():
if needs_nodehalo(it.dim, c.halo_scheme):
v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent)
break
if c.exprs:
for it, v in mapper.items():
if needs_nodehalo(it.dim, c.halo_scheme):
v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent)
break
else:
for it, v in reversed(mapper.items()):
if needs_nodehalo_dim(it.dim, c.halo_scheme):
v.bottom.children = (*v.bottom.children,
NodeHalo(c.halo_scheme, v.bottom.parent))
break

# Add in NodeExprs
exprs = []
Expand Down Expand Up @@ -182,7 +189,33 @@ def preprocess(clusters, options=None, **kwargs):
processed.append(c.rebuild(exprs=[], ispace=ispace, syncs=syncs))

halo_scheme = HaloScheme.union([c1.halo_scheme for c1 in found])
processed.append(c.rebuild(halo_scheme=halo_scheme))
itdims = c.ispace.itdims
if halo_scheme:
# We make sure that the halo scheme placement is valid for this cluster
# Mainly if a cluster has iteration dimension {t, d, f}
# and the halo_scheme has distributed_aindices {d} and
# loc_dirs {f} then the haloscheme needs to be lifted into its own
# {t, f} cluster because it needs to be inside an {f} loop while it would
# be placed outside the {d} loop and therefore outside the {f} loop if
# attached to the cluster.
dindex = max([itdims.index(d) for d in halo_scheme.loc_dirs
if d in itdims] + [0])
aindex = max([itdims.index(d) for d in halo_scheme.distributed_aindices
if d in itdims] + [0])
if dindex > aindex:
hispace = c.ispace.project(itdims[:dindex])
else:
hispace = None
else:
hispace = None

if hispace and options['mpi']:
processed.append(c.rebuild(exprs=[],
ispace=hispace,
halo_scheme=halo_scheme))
processed.append(c.rebuild(halo_scheme=None))
else:
processed.append(c.rebuild(halo_scheme=halo_scheme))

# Sanity check!
try:
Expand Down Expand Up @@ -229,6 +262,10 @@ def needs_nodehalo(d, hs):
return d and hs and d._defines.intersection(hs.distributed_aindices)


def needs_nodehalo_dim(d, hs):
return d and hs and d._defines.intersection(hs.loc_indices)


def reuse_section(candidate, section):
try:
if not section or candidate.siblings[-1] is not section:
Expand Down
4 changes: 4 additions & 0 deletions devito/mpi/halo_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ def distributed_aindices(self):
def loc_indices(self):
return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()])

@cached_property
def loc_dirs(self):
return set().union(*[i.loc_dirs.keys() for i in self.fmapper.values()])

@cached_property
def arguments(self):
return self.dimensions | set(flatten(self.honored.values()))
Expand Down
2 changes: 1 addition & 1 deletion devito/operations/interpolators.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _augment_implicit_dims(self, implicit_dims, extras=None):
idims = self.sfunction.dimensions + as_tuple(implicit_dims) + extra
else:
idims = extra + as_tuple(implicit_dims) + self.sfunction.dimensions
return tuple(filter_ordered(idims))
return tuple(idims)

def _coeff_temps(self, implicit_dims):
return []
Expand Down
3 changes: 3 additions & 0 deletions devito/passes/iet/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def rule1(dep, candidates, loc_dims):
for q in d._defines])

for n, i in enumerate(iters):
if i not in scopes:
continue

candidates = [i.dim._defines for i in iters[n:]]

all_candidates = set().union(*candidates)
Expand Down
12 changes: 4 additions & 8 deletions examples/seismic/tti/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,7 @@ def ForwardOperator(model, geometry, space_order=4,

# Source and receivers
expr = src * dt / m if kernel == 'staggered' else src * dt**2 / m
stencils += src.inject(field=u.forward, expr=expr)
stencils += src.inject(field=v.forward, expr=expr)
stencils += src.inject(field=(u.forward, v.forward), expr=expr)
stencils += rec.interpolate(expr=u + v)

# Substitute spacing terms to reduce flops
Expand Down Expand Up @@ -601,8 +600,7 @@ def AdjointOperator(model, geometry, space_order=4,

# Construct expression to inject receiver values
expr = rec * dt / m if kernel == 'staggered' else rec * dt**2 / m
stencils += rec.inject(field=p.backward, expr=expr)
stencils += rec.inject(field=r.backward, expr=expr)
stencils += rec.inject(field=(p.backward, r.backward), expr=expr)

# Create interpolation expression for the adjoint-source
stencils += srca.interpolate(expr=p + r)
Expand Down Expand Up @@ -661,8 +659,7 @@ def JacobianOperator(model, geometry, space_order=4,
eqn2 = FD_kernel(model, du, dv, space_order, qu=lin_usrc, qv=lin_vsrc)

# Construct expression to inject source values, injecting at u0(t+dt)/v0(t+dt)
src_term = src.inject(field=u0.forward, expr=src * dt**2 / m)
src_term += src.inject(field=v0.forward, expr=src * dt**2 / m)
src_term = src.inject(field=(u0.forward, v0.forward), expr=src * dt**2 / m)

# Create interpolation expression for receivers, extracting at du(t)+dv(t)
rec_term = rec.interpolate(expr=du + dv)
Expand Down Expand Up @@ -716,8 +713,7 @@ def JacobianAdjOperator(model, geometry, space_order=4,
dm_update = Inc(dm, - (u0 * du.dt2 + v0 * dv.dt2))

# Add expression for receiver injection
rec_term = rec.inject(field=du.backward, expr=rec * dt**2 / m)
rec_term += rec.inject(field=dv.backward, expr=rec * dt**2 / m)
rec_term = rec.inject(field=(du.backward, dv.backward), expr=rec * dt**2 / m)

# Substitute spacing terms to reduce flops
return Operator(eqn + rec_term + [dm_update], subs=model.spacing_map,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ class SparseFirst(SparseFunction):
ds = DefaultDimension("ps", default_value=3)
grid = Grid((11, 11))
dims = grid.dimensions
s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3))
s.coordinates.data[:] = [[.5, .5], [.2, .2]]
s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3),
coordinates=[[.5, .5], [.2, .2]])

# Check dimensions and shape are correctly initialized
assert s.indices[s._sparse_position] == dr
Expand Down
42 changes: 38 additions & 4 deletions tests/test_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,42 @@ def test_precomputed_sparse(self, r):
Operator(sf1.interpolate(u))()
assert np.all(sf1.data == 4)

@pytest.mark.parallel(mode=1)
def test_sparse_first(self):
"""
Tests custom sprase function with sparse dimension as first index.
"""

class SparseFirst(SparseFunction):
""" Custom sparse class with the sparse dimension as the first one"""
_sparse_position = 0

dr = Dimension("cd")
ds = DefaultDimension("ps", default_value=3)
grid = Grid((11, 11))
dims = grid.dimensions
s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3),
coordinates=[[.5, .5], [.2, .2]])

# Check dimensions and shape are correctly initialized
assert s.indices[s._sparse_position] == dr
assert s.shape == (2, 3)
assert s.coordinates.indices[0] == dr

# Operator
u = TimeFunction(name="u", grid=grid, time_order=1)
fs = Function(name="fs", grid=grid, dimensions=(*dims, ds), shape=(11, 11, 3))

eqs = [Eq(u.forward, u+1), Eq(fs, u)]
# No time dependence so need the implicit dim
rec = s.interpolate(expr=s+fs, implicit_dims=grid.stepping_dim)
op = Operator(eqs + rec)

op(time_M=10)
print
expected = 10*11/2 # n (n+1)/2
assert np.allclose(s.data, expected)

@pytest.mark.parallel(mode=4)
def test_no_grid_dim_slow(self):
shape = (12, 13, 14)
Expand All @@ -624,11 +660,10 @@ class CoordSlowSparseFunction(SparseFunction):
rec_eq = s.interpolate(expr=u)

op = Operator([Eq(u, 1)] + rec_eq)
print(op)
op.apply()
assert np.all(s.data == 1)

@pytest.mark.parallel(mode=4)
@pytest.mark.parallel(mode=1)
def test_no_grid_dim_slow_time(self):
shape = (12, 13, 14)
nfreq = 5
Expand All @@ -651,8 +686,7 @@ class CoordSlowSparseFunction(SparseTimeFunction):
rec_eq = s.interpolate(expr=u)

op = Operator([Eq(u, 1)] + rec_eq)
print(op)
op.apply()
op.apply(time_M=5)
assert np.all(s.data == 1)


Expand Down

0 comments on commit 1d87f69

Please sign in to comment.