Skip to content

Commit

Permalink
Merge pull request #1702 from devitocodes/revamp-streaming-final
Browse files Browse the repository at this point in the history
compiler: Revamp data streaming
  • Loading branch information
FabioLuporini authored Jun 17, 2021
2 parents fd24354 + e21744f commit 87ee495
Show file tree
Hide file tree
Showing 35 changed files with 2,165 additions and 940 deletions.
32 changes: 16 additions & 16 deletions .github/workflows/pytest-gpu.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Runner information:
# OpenACC on NVidia runs on `sarlaac`
# OpenMP on NVidia runs on `kimogila`
# OpenMP on AMD runs on `wampa`

name: CI-gpu

Expand Down Expand Up @@ -41,23 +42,21 @@ jobs:

matrix:
name: [
# NOTE: We can re-instate this as a 'failing' build
# as soon as the hardware is ready
pytest-gpu-omp,
pytest-gpu-acc
# pytest-gpu-aomp
pytest-gpu-omp-nvidia,
pytest-gpu-acc-nvidia,
pytest-gpu-omp-amd
]
include:
- name: pytest-gpu-omp
- name: pytest-gpu-omp-nvidia
test_file: "tests/test_gpu_openmp.py"
env_file: "devito-ci-nvidia-openmp.env"
test_drive_cmd: "nvidia-smi"
arch: "clang"
platform: "nvidiaX"
language: "openmp"
tags: ["self-hosted", "gpu", "openmp"]
tags: ["self-hosted", "gpu", "openmp", "clang"]

- name: pytest-gpu-acc
- name: pytest-gpu-acc-nvidia
test_file: "tests/test_gpu_openacc.py"
env_file: "devito-ci-nvidia-openacc.env"
test_drive_cmd: "nvidia-smi"
Expand All @@ -66,14 +65,14 @@ jobs:
language: "openacc"
tags: ["self-hosted", "gpu", "openacc"]

# - name: pytest-gpu-aomp
# test_file: "tests_gpu_aomp.py"
# env_file: "devito-ci-amd-openmp.py"
# test_drive_cmd: "TODO"
# arch: "aomp"
# platform: "amdgpuX"
# language: "openmp"
# tags: ["self-hosted", "gpu", "aomp"]
- name: pytest-gpu-omp-amd
test_file: "tests/test_gpu_openmp.py"
env_file: "devito-ci-amd-openmp.env"
test_drive_cmd: "rocm-smi"
arch: "aomp"
platform: "amdgpuX"
language: "openmp"
tags: ["self-hosted", "gpu", "openmp", "aomp"]

steps:
- name: Checkout devito
Expand Down Expand Up @@ -105,6 +104,7 @@ jobs:
pytest examples/seismic/viscoelastic/viscoelastic_example.py
- name: Test examples with MPI
if: matrix.name != 'pytest-gpu-omp-amd'
run: |
DEVITO_MPI=1 mpirun -n 2 pytest examples/seismic/acoustic/acoustic_example.py
DEVITO_MPI=1 mpirun -n 2 pytest examples/seismic/elastic/elastic_example.py
Expand Down
2 changes: 1 addition & 1 deletion devito/arch/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def __init__(self, *args, **kwargs):
self.cflags.remove('-std=c99')
self.cflags.remove('-O3')
self.cflags.remove('-Wall')
self.cflags += ['-std=c++11', '-acc:gpu', '-mp']
self.cflags += ['-std=c++11', '-acc:gpu', '-gpu=pinned', '-mp']
if not configuration['safe-math']:
self.cflags.append('-fast')
# Default PGI compile for a target is GPU and single threaded host.
Expand Down
2 changes: 1 addition & 1 deletion devito/core/autotuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def autotune(operator, args, level, mode):
for i in p.fields:
setattr(at_args[p.name]._obj, i, MPI.PROC_NULL)
elif isinstance(p, MPIMsgEnriched):
at_args.update(MPIMsgEnriched(p.name, p.function, p.halos)._arg_values())
at_args.update(MPIMsgEnriched(p.name, p.target, p.halos)._arg_values())
for i in at_args[p.name]:
i.fromrank = MPI.PROC_NULL
i.torank = MPI.PROC_NULL
Expand Down
25 changes: 9 additions & 16 deletions devito/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from devito.core.operator import CoreOperator, CustomOperator
from devito.exceptions import InvalidOperator
from devito.passes.equations import buffering, collect_derivatives
from devito.passes.clusters import (Lift, blocking, cire, cse, extract_increments,
factorize, fuse, optimize_pows)
from devito.passes.equations import collect_derivatives
from devito.passes.clusters import (Lift, blocking, buffering, cire, cse,
extract_increments, factorize, fuse, optimize_pows)
from devito.passes.iet import (CTarget, OmpTarget, avoid_denormals, mpiize,
optimize_halospots, hoist_prodders, relax_incr_dimensions)
from devito.tools import timed_pass
Expand Down Expand Up @@ -273,28 +273,21 @@ def _make_dsl_passes_mapper(cls, **kwargs):
}

@classmethod
def _make_exprs_passes_mapper(cls, **kwargs):
def _make_clusters_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# This callback simply mimics `is_on_device`, used in the device backends.
# It's used by `buffering` to replace `save!=None` TimeFunctions with buffers
# Callback used by `buffering`; it mimics `is_on_device`, which is used
# on device backends
def callback(f):
if f.is_TimeFunction and f.save is not None:
return [f.time_dim]
else:
return None

return {
'buffering': lambda i: buffering(i, callback, options)
}

@classmethod
def _make_clusters_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

return {
'buffering': lambda i: buffering(i, callback, sregistry, options),
'blocking': lambda i: blocking(i, options),
'factorize': factorize,
'fuse': fuse,
Expand Down
30 changes: 12 additions & 18 deletions devito/core/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from devito.core.operator import CoreOperator, CustomOperator
from devito.exceptions import InvalidOperator
from devito.passes.equations import collect_derivatives, buffering
from devito.passes.clusters import (Lift, Streaming, Tasker, blocking, cire, cse,
extract_increments, factorize, fuse, optimize_pows)
from devito.passes.equations import collect_derivatives
from devito.passes.clusters import (Lift, Streaming, Tasker, blocking, buffering,
cire, cse, extract_increments, factorize,
fuse, optimize_pows)
from devito.passes.iet import (DeviceOmpTarget, DeviceAccTarget, optimize_halospots,
mpiize, hoist_prodders, is_on_device)
from devito.tools import as_tuple, timed_pass
Expand Down Expand Up @@ -218,30 +219,23 @@ def _make_dsl_passes_mapper(cls, **kwargs):
}

@classmethod
def _make_exprs_passes_mapper(cls, **kwargs):
def _make_clusters_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Callbacks used by `Tasking` and `Streaming`
runs_on_host, reads_if_on_host = make_callbacks(options)

# This callback is used by `buffering` to replace host Functions with
# Arrays, used as device buffers for streaming-in and -out of data
# Callback used by `buffering`
def callback(f):
if not is_on_device(f, options['gpu-fit']):
return [f.time_dim]
else:
return None

return {
'buffering': lambda i: buffering(i, callback, options)
}

@classmethod
def _make_clusters_passes_mapper(cls, **kwargs):
options = kwargs['options']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

runs_on_host, reads_if_on_host = make_callbacks(options)

return {
'buffering': lambda i: buffering(i, callback, sregistry, options),
'blocking': lambda i: blocking(i, options),
'tasking': Tasker(runs_on_host).process,
'streaming': Streaming(reads_if_on_host).process,
Expand Down
14 changes: 9 additions & 5 deletions devito/ir/iet/efunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from devito.ir.iet.nodes import (BlankLine, Call, Callable, Conditional, Dereference,
DummyExpr, Iteration, List, PointerCast, Return, While)
from devito.ir.iet.utils import derive_parameters, diff_parameters
from devito.ir.iet.visitors import FindSymbols
from devito.symbolics import CondEq, CondNe, FieldFromComposite, FieldFromPointer, Macro
from devito.tools import as_tuple
from devito.types import PThreadArray, SharedData, Symbol, VoidPointer
Expand Down Expand Up @@ -164,17 +165,17 @@ def _make_thread_init(threads, tfunc, isdata, sdata, sregistry):


def _make_thread_func(name, iet, root, threads, sregistry):
sid = SharedData._symbolic_id

# Create the SharedData, that is the data structure that will be used by the
# main thread to pass information dows to the child thread(s)
required, parameters, dynamic_parameters = diff_parameters(iet, root)
required, parameters, dynamic_parameters = diff_parameters(iet, root, [sid])
parameters = sorted(parameters, key=lambda i: i.is_Function) # Allow casting
sdata = SharedData(name=sregistry.make_name(prefix='sdata'), npthreads=threads.size,
fields=required, dynamic_fields=dynamic_parameters)

sbase = sdata.symbolic_base
sid = sdata.symbolic_id

# Create a Callable to initialize `sdata` with the known const values
sbase = sdata.symbolic_base
iname = 'init_%s' % sdata.dtype._type_.__name__
ibody = [DummyExpr(FieldFromPointer(i._C_name, sbase), i._C_symbol)
for i in parameters]
Expand Down Expand Up @@ -210,10 +211,13 @@ def _make_thread_func(name, iet, root, threads, sregistry):
tparameter = VoidPointer('_%s' % sdata.name)

# Unpack `sdata`
symbol_names = {i.name for i in FindSymbols('free-symbols').visit(iet)}
unpack = [PointerCast(sdata, tparameter), BlankLine]
for i in parameters:
if i.is_AbstractFunction:
unpack.extend([Dereference(i, sdata), PointerCast(i)])
unpack.append(Dereference(i, sdata))
if i.name in symbol_names:
unpack.append(PointerCast(i))
else:
unpack.append(DummyExpr(i, FieldFromPointer(i.name, sbase)))
unpack.append(DummyExpr(sid, FieldFromPointer(sdata._field_id, sbase)))
Expand Down
23 changes: 18 additions & 5 deletions devito/ir/iet/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,25 @@ def __repr__(self):

@property
def functions(self):
retval = [i.function for i in self.arguments
if isinstance(i, (AbstractFunction, Indexed, LocalObject))]
retval = []
for i in self.arguments:
if isinstance(i, numbers.Number):
continue
elif isinstance(i, (AbstractFunction, Indexed, LocalObject)):
retval.append(i.function)
else:
for s in i.free_symbols:
try:
f = s.function
except AttributeError:
continue
if isinstance(f, AbstractFunction):
retval.append(f)
if self.base is not None:
retval.append(self.base.function)
if self.retobj is not None:
retval.append(self.retobj.function)
return tuple(retval)
return tuple(filter_ordered(retval))

@property
def children(self):
Expand Down Expand Up @@ -1054,9 +1066,10 @@ class PragmaList(List):
A floating sequence of pragmas.
"""

def __init__(self, pragmas, functions=None, **kwargs):
def __init__(self, pragmas, functions=None, free_symbols=None, **kwargs):
super().__init__(header=pragmas)
self._functions = as_tuple(functions)
self._free_symbols = as_tuple(free_symbols)

@property
def pragmas(self):
Expand All @@ -1068,7 +1081,7 @@ def functions(self):

@property
def free_symbols(self):
return self._functions
return self._free_symbols


class ParallelIteration(Iteration):
Expand Down
9 changes: 6 additions & 3 deletions devito/ir/iet/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from devito.ir.iet import Iteration, List, IterationTree, FindSections, FindSymbols
from devito.symbolics import Literal, Macro
from devito.tools import flatten, split
from devito.tools import as_tuple, flatten, split
from devito.types import Array, LocalObject

__all__ = ['filter_iterations', 'retrieve_iteration_tree', 'compose_nodes',
Expand Down Expand Up @@ -116,16 +116,19 @@ def derive_parameters(iet, drop_locals=False):
return parameters


def diff_parameters(iet, root):
def diff_parameters(iet, root, indirectly_provided=None):
"""
Derive the parameters of a sub-IET, `iet`, within a Callable, `root`, and
split them into two groups:
* the "read-only" parameters, and
* the "dynamic" parameters, whose value changes at some point in `root`.
The `indirectly_provided` are the parameters that are provided indirectly to
`iet`, for example via a composite type (e.g., a C struct).
"""
# TODO: this is currently very rudimentary
required = derive_parameters(iet)
required = [i for i in required if i not in as_tuple(indirectly_provided)]

known = set(root.parameters) | set(i for i in required if i.is_Array)

Expand Down
3 changes: 1 addition & 2 deletions devito/ir/iet/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,7 @@ def visit_ExprStmt(self, o, ret=None, queue=None):
ret[o] = as_tuple(queue)
return ret

visit_Conditional = FindSections.visit_Node

visit_Conditional = FindSections.visit_Iteration
visit_Block = FindSections.visit_Iteration


Expand Down
9 changes: 7 additions & 2 deletions devito/ir/stree/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ def attach_metadata(cluster, d, tip):
return tip

for c in clusters:
pointers = list(mapper)
# Add in any Conditionals and Syncs outside of the outermost Iteration
tip = attach_metadata(c, None, stree)

if tip is stree:
pointers = list(mapper)
else:
pointers = []

index = 0
tip = stree
for it0, it1 in zip(c.itintervals, pointers):
if it0 != it1:
break
Expand Down
10 changes: 7 additions & 3 deletions devito/ir/support/space.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,10 @@ class IterationInterval(object):
An Interval associated with an IterationDirection.
"""

def __init__(self, interval, direction, sub_iterators):
def __init__(self, interval, sub_iterators, direction):
self.interval = interval
self.direction = direction
self.sub_iterators = sub_iterators
self.direction = direction

def __repr__(self):
return "%s%s" % (self.interval, self.direction)
Expand All @@ -515,6 +515,10 @@ def __eq__(self, other):
def __hash__(self):
return hash((self.interval, self.direction))

@property
def args(self):
return (self.interval, self.sub_iterators, self.direction)

@property
def dim(self):
return self.interval.dim
Expand Down Expand Up @@ -838,7 +842,7 @@ def directions(self):
@cached_property
def itintervals(self):
return tuple(IterationInterval(
i, self.directions[i.dim], self.sub_iterators.get(i.dim)
i, self.sub_iterators.get(i.dim), self.directions[i.dim]
) for i in self.intervals)

@cached_property
Expand Down
Loading

0 comments on commit 87ee495

Please sign in to comment.