Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compiler: Patch race conditions due to storage-related dependencies #1903

Merged
merged 5 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion devito/arch/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,13 @@ def __init__(self, *args, **kwargs):
self.cflags.remove('-std=c99')
self.cflags.remove('-O3')
self.cflags.remove('-Wall')
self.cflags += ['-std=c++11', '-acc:gpu', '-gpu=pinned', '-mp']

self.cflags += ['-std=c++11', '-mp']

platform = kwargs.pop('platform', configuration['platform'])
if platform is NVIDIAX:
self.cflags += ['-acc:gpu', '-gpu=pinned']

if not configuration['safe-math']:
self.cflags.append('-fast')
# Default PGI compile for a target is GPU and single threaded host.
Expand Down
9 changes: 6 additions & 3 deletions devito/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ class Cpu64NoopOperator(Cpu64OperatorMixin, CoreOperator):
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: not for here but long run, should be gather all of these into a CompilerOptions class to avoid carrying all these multiple arguments everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a fair point, but I actually don't mind supplying the passes only the strict necessary rather than a big batch of things

sregistry = kwargs['sregistry']

# Distributed-memory parallelism
mpiize(graph, sregistry=sregistry, options=options)

# Shared-memory parallelism
if options['openmp']:
parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)
parizer.make_parallel(graph)
parizer.initialize(graph)

Expand Down Expand Up @@ -213,6 +214,7 @@ def _specialize_clusters(cls, clusters, **kwargs):
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
sregistry = kwargs['sregistry']

# Flush denormal numbers
Expand All @@ -225,7 +227,7 @@ def _specialize_iet(cls, graph, **kwargs):
relax_incr_dimensions(graph)

# Parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)
parizer.make_simd(graph)
parizer.make_parallel(graph)
parizer.initialize(graph)
Expand Down Expand Up @@ -304,9 +306,10 @@ def callback(f):
def _make_iet_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
sregistry = kwargs['sregistry']

parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)

return {
'denormals': avoid_denormals,
Expand Down
9 changes: 6 additions & 3 deletions devito/core/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,14 @@ class DeviceNoopOperator(DeviceOperatorMixin, CoreOperator):
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
mpiize(graph, sregistry=sregistry, options=options)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)
parizer.make_parallel(graph)
parizer.initialize(graph)

Expand Down Expand Up @@ -202,6 +203,7 @@ def _specialize_clusters(cls, clusters, **kwargs):
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
Expand All @@ -211,7 +213,7 @@ def _specialize_iet(cls, graph, **kwargs):
relax_incr_dimensions(graph)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)
parizer.make_parallel(graph)
parizer.initialize(graph)

Expand Down Expand Up @@ -282,9 +284,10 @@ def callback(f):
def _make_iet_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
compiler = kwargs['compiler']
sregistry = kwargs['sregistry']

parizer = cls._Target.Parizer(sregistry, options, platform)
parizer = cls._Target.Parizer(sregistry, options, platform, compiler)
orchestrator = cls._Target.Orchestrator(sregistry)

return {
Expand Down
30 changes: 21 additions & 9 deletions devito/ir/support/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ def aindices(self):
retval.append(dims.pop())
elif isinstance(i, Dimension):
retval.append(i)
elif q_constant(i):
retval.append(fi)
else:
retval.append(None)
return DimensionTuple(*retval, getters=self.findices)
Expand Down Expand Up @@ -262,10 +260,14 @@ def is_regular(self):
# space Dimensions
positions = []
for d in self.aindices:
for n, i in enumerate(self.intervals):
if i.dim._defines & d._defines:
positions.append(n)
break
try:
for n, i in enumerate(self.intervals):
if i.dim._defines & d._defines:
positions.append(n)
break
except AttributeError:
# `d is None` due to e.g. constant access
continue
return positions == sorted(positions)

def __lt__(self, other):
Expand Down Expand Up @@ -548,6 +550,15 @@ def is_cross(self):
def is_local(self):
return self.function.is_Symbol

@memoized_meth
def is_const(self, dim):
"""
True if a constant depedence, that is no Dimensions involved, False otherwise.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo depedence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"""
return (self.source.aindices[dim] is None and
self.sink.aindices[dim] is None and
self.distance_mapper[dim] == 0)

@memoized_meth
def is_carried(self, dim=None):
"""Return True if definitely a dimension-carried dependence, False otherwise."""
Expand Down Expand Up @@ -623,9 +634,10 @@ def is_storage_related(self, dims=None):
cause the access of the same memory location, False otherwise.
"""
for d in self.findices:
if (d._defines & set(as_tuple(dims)) and
any(i.is_NonlinearDerived for i in d._defines)):
return True
if d._defines & set(as_tuple(dims)):
if any(i.is_NonlinearDerived for i in d._defines) or \
self.is_const(d):
return True
return False


Expand Down
9 changes: 7 additions & 2 deletions devito/mpi/halo_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def classify(exprs, ispace):
v[(d, LEFT)] = STENCIL
v[(d, RIGHT)] = STENCIL
else:
v[(d, i.aindices[d])] = NONE
v[(d, i[d])] = NONE

# Does `i` actually require a halo exchange?
if not any(hl is STENCIL for hl in v.values()):
Expand Down Expand Up @@ -426,7 +426,12 @@ def classify(exprs, ispace):
func = Max
candidates = [i for i in aindices if not is_integer(i)]
candidates = {(i.origin if d.is_Stepping else i) - d: i for i in candidates}
loc_indices[d] = candidates[func(*candidates.keys())]
try:
loc_indices[d] = candidates[func(*candidates.keys())]
except KeyError:
# E.g., `aindices = [0, 1, d+1]` -- it doesn't really matter
# what we put here, so we place 0 as it's the old behaviour
loc_indices[d] = 0

mapper[f] = HaloSchemeEntry(frozendict(loc_indices), frozenset(halos))

Expand Down
2 changes: 1 addition & 1 deletion devito/passes/clusters/asynchrony.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def callback(self, clusters, prefix):
else:
# Functions over non-stepping Dimensions need no lock
continue
except KeyError:
except (AttributeError, KeyError):
# Would degenerate to a scalar, but we rather use a lock
# of size 1 for simplicity
ld = CustomDimension(name='ld', symbolic_size=1)
Expand Down
25 changes: 22 additions & 3 deletions devito/passes/clusters/buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import numpy as np

from devito.ir import (Cluster, Forward, GuardBound, Interval, IntervalGroup,
IterationSpace, PARALLEL, Queue, Vector, lower_exprs, vmax, vmin)
IterationSpace, PARALLEL, Queue, SEQUENTIAL, Vector,
lower_exprs, normalize_properties, vmax, vmin)
from devito.exceptions import InvalidOperator
from devito.logger import warning
from devito.symbolics import retrieve_function_carriers, uxreplace
Expand Down Expand Up @@ -207,7 +208,16 @@ def callback(self, clusters, prefix, cache=None):
expr = lower_exprs(uxreplace(Eq(lhs, rhs), b.subdims_mapper))
ispace = b.written

processed.append(c.rebuild(exprs=expr, ispace=ispace))
# Buffering creates a storage-related dependence along the
# contracted dimensions
properties = dict(c.properties)
for d in b.contraction_mapper:
d = ispace[d].dim # E.g., `time_sub -> time`
properties[d] = normalize_properties(properties[d], {SEQUENTIAL})

processed.append(
c.rebuild(exprs=expr, ispace=ispace, properties=properties)
)

# Substitute buffered Functions with the newly created buffers
exprs = [uxreplace(e, subs) for e in c.exprs]
Expand All @@ -233,7 +243,16 @@ def callback(self, clusters, prefix, cache=None):
expr = lower_exprs(uxreplace(Eq(lhs, rhs), b.subdims_mapper))
ispace = b.written

processed.append(c.rebuild(exprs=expr, ispace=ispace))
# Buffering creates a storage-related dependence along the
# contracted dimensions
properties = dict(c.properties)
mloubout marked this conversation as resolved.
Show resolved Hide resolved
for d in b.contraction_mapper:
d = ispace[d].dim # E.g., `time_sub -> time`
properties[d] = normalize_properties(properties[d], {SEQUENTIAL})

processed.append(
c.rebuild(exprs=expr, ispace=ispace, properties=properties)
)

return processed

Expand Down
5 changes: 4 additions & 1 deletion devito/passes/iet/langbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class LangTransformer(ABC):
The constructs of the target language. To be specialized by a subclass.
"""

def __init__(self, key, sregistry, platform):
def __init__(self, key, sregistry, platform, compiler):
"""
Parameters
----------
Expand All @@ -195,13 +195,16 @@ def __init__(self, key, sregistry, platform):
The symbol registry, to access the symbols appearing in an IET.
platform : Platform
The underlying platform.
compiler : Compiler
The underlying JIT compiler.
"""
if key is not None:
self.key = key
else:
self.key = lambda i: False
self.sregistry = sregistry
self.platform = platform
self.compiler = compiler

@iet_pass
def make_parallel(self, iet):
Expand Down
13 changes: 13 additions & 0 deletions devito/passes/iet/languages/openmp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from distutils import version

import cgen as c
from sympy import Not

from devito.arch import AMDGPUX, NVIDIAX, INTELGPUX
from devito.arch.compiler import GNUCompiler
from devito.ir import (Call, Conditional, List, Prodder, ParallelIteration,
ParallelBlock, PointerCast, While, FindSymbols)
from devito.passes.iet.definitions import DataManager, DeviceAwareDataManager
Expand Down Expand Up @@ -186,8 +189,18 @@ class SimdOmpizer(PragmaSimdTransformer):


class Ompizer(PragmaShmTransformer):

lang = OmpBB

@classmethod
def _support_array_reduction(cls, compiler):
# Not all backend compilers support array reduction!
# Here are the known unsupported ones:
if isinstance(compiler, GNUCompiler) and \
compiler.version < version.StrictVersion("6.0"):
return False
return True


class DeviceOmpizer(PragmaDeviceAwareTransformer):
lang = DeviceOmpBB
Expand Down
22 changes: 16 additions & 6 deletions devito/passes/iet/parpragma.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class PragmaShmTransformer(PragmaSimdTransformer):
and shared-memory-parallel IETs.
"""

def __init__(self, sregistry, options, platform):
def __init__(self, sregistry, options, platform, compiler):
"""
Parameters
----------
Expand All @@ -116,9 +116,11 @@ def __init__(self, sregistry, options, platform):
is greater than this threshold.
platform : Platform
The underlying platform.
compiler : Compiler
The underlying JIT compiler.
"""
key = lambda i: i.is_ParallelRelaxed and not i.is_Vectorized
super().__init__(key, sregistry, platform)
super().__init__(key, sregistry, platform, compiler)

self.collapse_ncores = options['par-collapse-ncores']
self.collapse_work = options['par-collapse-work']
Expand Down Expand Up @@ -209,14 +211,22 @@ def _select_candidates(self, candidates):

return root, list(collapsable)

@classmethod
def _support_array_reduction(cls, compiler):
return True

def _make_reductions(self, partree):
if not any(i.is_ParallelAtomic for i in partree.collapsed):
return partree

exprs = [i for i in FindNodes(Expression).visit(partree) if i.is_Increment]
reduction = [i.output for i in exprs]
if all(i.is_Affine for i in partree.collapsed) or \
all(not i.is_Indexed for i in reduction):

test0 = all(not i.is_Indexed for i in reduction)
test1 = (self._support_array_reduction(self.compiler) and
all(i.is_Affine for i in partree.collapsed))

if test0 or test1:
# Implement reduction
mapper = {partree.root: partree.root._rebuild(reduction=reduction)}
else:
Expand Down Expand Up @@ -399,8 +409,8 @@ class PragmaDeviceAwareTransformer(DeviceAwareMixin, PragmaShmTransformer):
shared-memory-parallel, and device-parallel IETs.
"""

def __init__(self, sregistry, options, platform):
super().__init__(sregistry, options, platform)
def __init__(self, sregistry, options, platform, compiler):
super().__init__(sregistry, options, platform, compiler)

self.gpu_fit = options['gpu-fit']
self.par_tile = options['par-tile']
Expand Down
19 changes: 19 additions & 0 deletions tests/test_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,25 @@ def test_multi_access():
assert np.all(w.data == w1.data)


def test_issue_1901():
grid = Grid(shape=(2, 2))
time = grid.time_dim
x, y = grid.dimensions

usave = TimeFunction(name='usave', grid=grid, save=10)
v = TimeFunction(name='v', grid=grid)

eq = [Eq(v[time, x, y], usave)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: Eq(usave, v) bit more readable and makes bit more "sense"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want time in place of t since otherwise the loop will be tagged SEQUENTIAL due to the presence of uindices. But I want it fully PARALLEL to trigger the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fin,e I meant having usave as lhs mostly but again just nitpicking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see what you mean now...
well in backward propagators you do have usave's on the RHS :)


op = Operator(eq, opt='buffering')

trees = retrieve_iteration_tree(op)
assert len(trees) == 2
assert trees[1].root.dim is time
assert not trees[1].root.is_Parallel
assert trees[1].root.is_Sequential # Obv


def test_everything():
nt = 50
grid = Grid(shape=(6, 6))
Expand Down
Loading