Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ndsl/stencils/testing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def parallel_savepoint_cases(

def pytest_generate_tests(metafunc):
backend = metafunc.config.getoption("backend")
if MPI is not None and MPI.COMM_WORLD.Get_size() > 1:
if MPI.COMM_WORLD.Get_size() > 1:
if metafunc.function.__name__ == "test_parallel_savepoint":
generate_parallel_stencil_tests(metafunc, backend=backend)
elif metafunc.function.__name__ == "test_sequential_savepoint":
Expand Down
4 changes: 2 additions & 2 deletions ndsl/stencils/testing/test_translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _get_thresholds(compute_function, input_data) -> None:

@pytest.mark.sequential
@pytest.mark.skipif(
MPI is not None and MPI.COMM_WORLD.Get_size() > 1,
MPI.COMM_WORLD.Get_size() > 1,
reason="Running in parallel with mpi",
)
def test_sequential_savepoint(
Expand Down Expand Up @@ -293,7 +293,7 @@ def get_tile_communicator(comm, layout):

@pytest.mark.parallel
@pytest.mark.skipif(
MPI is None or MPI.COMM_WORLD.Get_size() == 1,
MPI.COMM_WORLD.Get_size() == 1,
reason="Not running in parallel with mpi",
)
def test_parallel_savepoint(
Expand Down
179 changes: 100 additions & 79 deletions tests/dsl/test_caches.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import shutil
import sys
from pathlib import Path

import pytest
from gt4py.cartesian import config as gt_config
Expand All @@ -16,30 +17,38 @@
from ndsl.comm.mpi import MPI
from ndsl.dsl.dace.orchestration import orchestrate
from ndsl.dsl.gt4py import PARALLEL, Field, computation, interval
from ndsl.dsl.stencil import CompareToNumpyStencil, FrozenStencil
from tests.dsl import utils


def _make_storage(
func,
grid_indexing,
stencil_config: StencilConfig,
*,
dtype=float,
aligned_index=(0, 0, 0),
):
return func(
backend=stencil_config.compilation_config.backend,
shape=grid_indexing.domain,
dtype=dtype,
aligned_index=aligned_index,
)
@pytest.fixture
def tmp_cache_root(tmpdir):
original_root = gt_config.cache_settings["root_path"]
gt_config.cache_settings["root_path"] = tmpdir

yield tmpdir

# restore original cache settings
gt_config.cache_settings["root_path"] = original_root


@pytest.fixture
def restore_cache_dir():
cache_dir = gt_config.cache_settings["dir_name"]

yield

gt_config.cache_settings["dir_name"] = cache_dir


def _stencil(inp: Field[float], out: Field[float], scalar: float):
with computation(PARALLEL), interval(...):
out = inp


def _build_stencil(backend, orchestrated: DaCeOrchestration):
def _build_stencil(
backend: str, orchestrated: DaCeOrchestration
) -> tuple[FrozenStencil | CompareToNumpyStencil, GridIndexing, StencilConfig]:
# Make stencil and verify it ran
grid_indexing = GridIndexing(
domain=(5, 5, 5),
Expand All @@ -65,115 +74,127 @@ def _build_stencil(backend, orchestrated: DaCeOrchestration):


class OrchestratedProgram:
def __init__(self, backend, orchestration):
def __init__(self, backend, orchestration: DaCeOrchestration):
self.stencil, grid_indexing, stencil_config = _build_stencil(
backend, orchestration
)
orchestrate(obj=self, config=stencil_config.dace_config)
self.inp = _make_storage(ones, grid_indexing, stencil_config, dtype=float)
self.out = _make_storage(empty, grid_indexing, stencil_config, dtype=float)
self.inp = utils.make_storage(ones, grid_indexing, stencil_config, dtype=float)
self.out = utils.make_storage(empty, grid_indexing, stencil_config, dtype=float)

def __call__(self):
self.stencil(self.inp, self.out, self.inp[0, 0, 0])


@pytest.mark.parametrize(
"backend",
[
pytest.param("dace:cpu"),
],
)
@pytest.mark.skipif(
MPI is not None, reason="relocatibility checked with a one-rank setup"
MPI.COMM_WORLD.Get_size() > 1, reason="relocatibility checked with a one-rank setup"
)
def test_relocatability_orchestration(backend):
original_root_directory = gt_config.cache_settings["root_path"]
working_dir = str(os.getcwd())

def test_relocatability_orchestration(restore_cache_dir) -> None:
# Compile on default
p0 = OrchestratedProgram(backend, DaCeOrchestration.BuildAndRun)
p0 = OrchestratedProgram("dace:cpu", DaCeOrchestration.BuildAndRun)
p0()
assert os.path.exists(
f"{working_dir}/.gt_cache_FV3_A/dacecache/"
"test_caches_OrchestratedProgam___call__",
) or os.path.exists(
f"{working_dir}/.gt_cache_FV3_A/dacecache/OrchestratedProgam___call__",

expected_cache_dir = (
Path.cwd()
/ ".gt_cache_FV3_A"
/ "dacecache"
/ "test_caches_OrchestratedProgram___call__"
)
assert expected_cache_dir.exists()

# Compile in another directory

custom_path = f"{working_dir}/.my_cache_path"
gt_config.cache_settings["root_path"] = custom_path
@pytest.mark.skipif(
MPI.COMM_WORLD.Get_size() > 1, reason="relocatibility checked with a one-rank setup"
)
def test_relocatability_orchestration_tmpdir(restore_cache_dir, tmp_cache_root) -> None:
# Compile in temporary directory that is only available in this test session.
backend = "dace:cpu"
p1 = OrchestratedProgram(backend, DaCeOrchestration.BuildAndRun)
p1()
assert os.path.exists(
f"{custom_path}/.gt_cache_FV3_A/dacecache/"
"test_caches_OrchestratedProgam___call__",
) or os.path.exists(
f"{working_dir}/.gt_cache_FV3_A/dacecache/OrchestratedProgam___call__",

expected_cache_dir = (
tmp_cache_root
/ ".gt_cache_FV3_A"
/ "dacecache"
/ "test_caches_OrchestratedProgram___call__"
)
assert expected_cache_dir.exists()

# Check relocability by copying the second cache directory,
# changing the path of gt_config.cache_settings and trying to Run on it
relocated_path = f"{working_dir}/.my_relocated_cache_path"
shutil.copytree(custom_path, relocated_path, dirs_exist_ok=True)
relocated_path = tmp_cache_root / ".my_relocated_cache_path"
shutil.copytree(tmp_cache_root, relocated_path, dirs_exist_ok=False)
gt_config.cache_settings["root_path"] = relocated_path
p2 = OrchestratedProgram(backend, DaCeOrchestration.Run)
p2()

# Generate a file exists error to check for bad path
bogus_path = "./nope/notatall/nothappening"
bogus_path = "./nope/not_at_all/not_happening"
gt_config.cache_settings["root_path"] = bogus_path
with pytest.raises(RuntimeError):
OrchestratedProgram(backend, DaCeOrchestration.Run)

# Restore cache settings
gt_config.cache_settings["root_path"] = original_root_directory


@pytest.mark.parametrize(
"backend",
[
pytest.param("dace:cpu"),
],
)
@pytest.mark.skipif(
MPI is not None, reason="relocatibility checked with a one-rank setup"
MPI.COMM_WORLD.Get_size() > 1, reason="relocatibility checked with a one-rank setup"
)
def test_relocatability(backend: str):
# Restore original dir name
gt_config.cache_settings["dir_name"] = os.environ.get(
"GT_CACHE_DIR_NAME", f".gt_cache_{MPI.COMM_WORLD.Get_rank():06}"
)

backend_sanitized = backend.replace(":", "")

def test_relocatability(restore_cache_dir) -> None:
# Compile on default
backend = "dace:cpu"
p0 = OrchestratedProgram(backend, DaCeOrchestration.Python)
p0()
assert os.path.exists(
f"./.gt_cache_000000/py38_1013/{backend_sanitized}/test_caches/_stencil/"

backend_sanitized = backend.replace(":", "")
python_version = f"py{sys.version_info[0]}{sys.version_info[1]}"
expected_cache_path = (
Path.cwd()
/ ".gt_cache_000000"
/ f"{python_version}_1013"
/ f"{backend_sanitized}"
/ "test_caches"
/ "_stencil"
)
assert expected_cache_path.exists()

# Compile in another directory

custom_path = "./.my_cache_path"
gt_config.cache_settings["root_path"] = custom_path
@pytest.mark.skipif(
MPI.COMM_WORLD.Get_size() > 1, reason="relocatibility checked with a one-rank setup"
)
def test_relocatability_tmpdir(restore_cache_dir, tmp_cache_root) -> None:
# Compile in another directory
backend = "dace:cpu"
p1 = OrchestratedProgram(backend, DaCeOrchestration.Python)
p1()
assert os.path.exists(
f"{custom_path}/.gt_cache_000000/py38_1013/{backend_sanitized}"
"/test_caches/_stencil/"

backend_sanitized = backend.replace(":", "")
python_version = f"py{sys.version_info[0]}{sys.version_info[1]}"
expected_cache_path = (
tmp_cache_root
/ ".gt_cache_000000"
/ f"{python_version}_1013"
/ f"{backend_sanitized}"
/ "test_caches"
/ "_stencil"
)
assert expected_cache_path.exists()

# Check relocability by copying the second cache directory,
# Check relocability by copying the first cache directory,
# changing the path of gt_config.cache_settings and trying to Run on it
relocated_path = "./.my_relocated_cache_path"
shutil.copytree("./.gt_cache_000000", relocated_path, dirs_exist_ok=True)
relocated_path = tmp_cache_root / ".my_relocated_cache_path"
shutil.copytree(
tmp_cache_root / ".gt_cache_000000", relocated_path, dirs_exist_ok=False
)
gt_config.cache_settings["root_path"] = relocated_path

p2 = OrchestratedProgram(backend, DaCeOrchestration.Python)
p2()
assert os.path.exists(
f"{relocated_path}/.gt_cache_000000/py38_1013/{backend_sanitized}"
"/test_caches/_stencil/"

relocated_cache_path = (
relocated_path
/ ".gt_cache_000000"
/ f"{python_version}_1013"
/ f"{backend_sanitized}"
/ "test_caches"
/ "_stencil"
)
assert relocated_cache_path.exists()
16 changes: 8 additions & 8 deletions tests/dsl/test_compilation_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_safety_checks():
)
def test_check_communicator_valid(
size: int, use_minimal_caching: bool, run_mode: RunMode
):
) -> None:
partitioner = CubedSpherePartitioner(
TilePartitioner((int(sqrt(size / 6)), int((sqrt(size / 6)))))
)
Expand All @@ -50,7 +50,7 @@ def test_check_communicator_valid(
)
def test_check_communicator_invalid(
nx: int, ny: int, use_minimal_caching: bool, run_mode: RunMode
):
) -> None:
partitioner = CubedSpherePartitioner(TilePartitioner((nx, ny)))
comm = NullComm(rank=0, total_ranks=nx * ny * 6)
cubed_sphere_comm = CubedSphereCommunicator(comm, partitioner)
Expand All @@ -61,7 +61,7 @@ def test_check_communicator_invalid(
config.check_communicator(cubed_sphere_comm)


def test_get_decomposition_info_from_no_comm():
def test_get_decomposition_info_from_no_comm() -> None:
config = CompilationConfig()
(
computed_rank,
Expand All @@ -86,7 +86,7 @@ def test_get_decomposition_info_from_no_comm():
)
def test_get_decomposition_info_from_comm(
rank: int, size: int, is_compiling: bool, equivalent: int
):
) -> None:
partitioner = CubedSpherePartitioner(
TilePartitioner((int(sqrt(size / 6)), int(sqrt(size / 6))))
)
Expand Down Expand Up @@ -123,8 +123,8 @@ def test_get_decomposition_info_from_comm(
],
)
def test_determine_compiling_equivalent(
rank, size, minimal_caching, run_mode, equivalent
):
rank: int, size: int, minimal_caching: bool, run_mode: RunMode, equivalent: int
) -> None:
config = CompilationConfig(use_minimal_caching=minimal_caching, run_mode=run_mode)
partitioner = CubedSpherePartitioner(
TilePartitioner((sqrt(size / 6), sqrt(size / 6)))
Expand All @@ -138,7 +138,7 @@ def test_determine_compiling_equivalent(
)


def test_as_dict():
def test_as_dict() -> None:
config = CompilationConfig()
asdict = config.as_dict()
assert asdict["backend"] == "numpy"
Expand All @@ -151,7 +151,7 @@ def test_as_dict():
assert len(asdict) == 7


def test_from_dict():
def test_from_dict() -> None:
specification_dict = {}
config = CompilationConfig.from_dict(specification_dict)
assert config.backend == "numpy"
Expand Down
Loading