Skip to content

Commit

Permalink
Merge pull request #694 from adi611/adi611-patch-testpsij-1
Browse files Browse the repository at this point in the history
Draft: Adding new worker which uses PSI/J to run tasks
  • Loading branch information
djarecka committed Sep 24, 2023
2 parents 0245cdc + 2c695d5 commit 0e3d33c
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 1 deletion.
45 changes: 45 additions & 0 deletions .github/workflows/testpsijlocal.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: PSI/J-Local

on:
push:
branches:
- master
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read

jobs:
test:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
python-version: ['3.11']
fail-fast: false
runs-on: ${{ matrix.os }}

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}

- name: Setup Python version ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies for PSI/J
run: |
pip install -e ".[test, psij]"
- name: Run tests for PSI/J
run: |
pytest --color=yes -vs --psij=local -n auto pydra/engine --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml
- name: Upload to codecov
run: codecov -f cov.xml -F unittests -e GITHUB_WORKFLOW
54 changes: 54 additions & 0 deletions .github/workflows/testpsijslurm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: PSI/J-SLURM

on:
push:
branches:
- master
pull_request:

jobs:
build:
strategy:
matrix:
python-version: [3.11.5]
fail-fast: false
runs-on: ubuntu-latest
env:
DOCKER_IMAGE: adi611/docker-centos7-slurm:23.02.1

steps:
- name: Disable etelemetry
run: echo "NO_ET=TRUE" >> $GITHUB_ENV
- uses: actions/checkout@v4
- name: Pull docker image
run: |
docker pull $DOCKER_IMAGE
# Have image running in the background
docker run `bash <(curl -s https://codecov.io/env)` -itd -h slurmctl --cap-add sys_admin -d --name slurm -v `pwd`:/pydra -e NO_ET=$NO_ET $DOCKER_IMAGE
- name: Display previous jobs with sacct
run: |
echo "Allowing ports/daemons time to start" && sleep 10
docker exec slurm bash -c "sacctmgr -i add account none,test Cluster=linux Description='none' Organization='none'"
docker exec slurm bash -c "sacct && sinfo && squeue" 2&> /dev/null
if [ $? -ne 0 ]; then
echo "Slurm docker image error"
exit 1
fi
- name: Setup Python
run: |
docker exec slurm bash -c "echo $NO_ET"
docker exec slurm bash -c "ls -la && echo list top level dir"
docker exec slurm bash -c "ls -la /pydra && echo list pydra dir"
if [[ "${{ matrix.python-version }}" == "3.11.5" ]]; then
docker exec slurm bash -c "CONFIGURE_OPTS=\"-with-openssl=/opt/openssl\" pyenv install -v 3.11.5"
fi
docker exec slurm bash -c "pyenv global ${{ matrix.python-version }}"
docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test,psij] && python -c 'import pydra; print(pydra.__version__)'"
- name: Run pytest
run: |
docker exec slurm bash -c "pytest --color=yes -vs -n auto --psij=slurm --cov pydra --cov-config /pydra/.coveragerc --cov-report xml:/pydra/cov.xml --doctest-modules /pydra/pydra/ -k 'not test_audit_prov and not test_audit_prov_messdir_1 and not test_audit_prov_messdir_2 and not test_audit_prov_wf and not test_audit_all'"
- name: Upload to codecov
run: |
docker exec slurm bash -c "pip install urllib3==1.26.6"
docker exec slurm bash -c "codecov --root /pydra -f /pydra/cov.xml -F unittests"
docker rm -f slurm
26 changes: 26 additions & 0 deletions pydra/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

def pytest_addoption(parser):
parser.addoption("--dask", action="store_true", help="run all combinations")
parser.addoption(
"--psij",
action="store",
help="run with psij subtype plugin",
choices=["local", "slurm"],
)


def pytest_generate_tests(metafunc):
Expand All @@ -21,6 +27,16 @@ def pytest_generate_tests(metafunc):
except ValueError:
# Called as --pyargs, so --dask isn't available
pass
try:
if metafunc.config.getoption("psij"):
Plugins.append("psij-" + metafunc.config.getoption("psij"))
if (
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")
except ValueError:
pass
metafunc.parametrize("plugin_dask_opt", Plugins)

if "plugin" in metafunc.fixturenames:
Expand All @@ -35,6 +51,16 @@ def pytest_generate_tests(metafunc):
Plugins = ["slurm"]
else:
Plugins = ["cf"]
try:
if metafunc.config.getoption("psij"):
Plugins.append("psij-" + metafunc.config.getoption("psij"))
if (
bool(shutil.which("sbatch"))
and metafunc.config.getoption("psij") == "slurm"
):
Plugins.remove("slurm")
except ValueError:
pass
metafunc.parametrize("plugin", Plugins)


Expand Down
31 changes: 31 additions & 0 deletions pydra/engine/run_pickled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pickle
import sys
from pydra.engine.helpers import load_and_run


def run_pickled(*file_paths, rerun=False):
loaded_objects = []

for file_path in file_paths:
with open(file_path, "rb") as file:
loaded_objects.append(pickle.load(file))

if len(loaded_objects) == 1:
result = loaded_objects[0](rerun=rerun)
elif len(loaded_objects) == 2:
result = load_and_run(loaded_objects[0], loaded_objects[1], rerun=rerun)
else:
raise ValueError("Unsupported number of loaded objects")

print(f"Result: {result}")


if __name__ == "__main__":
rerun = False # Default value for rerun
file_paths = sys.argv[1:]

if "--rerun" in file_paths:
rerun = True
file_paths.remove("--rerun")

run_pickled(*file_paths, rerun=rerun)
2 changes: 1 addition & 1 deletion pydra/engine/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4092,7 +4092,7 @@ def test_wf_lzoutall_st_2a(plugin, tmpdir):
wf.plugin = plugin
wf.cache_dir = tmpdir

with Submitter(plugin="cf") as sub:
with Submitter(plugin=plugin) as sub:
sub(wf)

assert wf.output_dir.exists()
Expand Down
148 changes: 148 additions & 0 deletions pydra/engine/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,158 @@ def close(self):
pass


class PsijWorker(Worker):
"""A worker to execute tasks using PSI/J."""

def __init__(self, subtype, **kwargs):
"""
Initialize PsijWorker.
Parameters
----------
subtype : str
Scheduler for PSI/J.
"""
try:
import psij
except ImportError:
logger.critical("Please install psij.")
raise
logger.debug("Initialize PsijWorker")
self.psij = psij

# Check if the provided subtype is valid
valid_subtypes = ["local", "slurm"]
if subtype not in valid_subtypes:
raise ValueError(
f"Invalid 'subtype' provided. Available options: {', '.join(valid_subtypes)}"
)

self.subtype = subtype

def run_el(self, interface, rerun=False, **kwargs):
"""Run a task."""
return self.exec_psij(interface, rerun=rerun)

def make_spec(self, cmd=None, arg=None):
"""
Create a PSI/J job specification.
Parameters
----------
cmd : str, optional
Executable command. Defaults to None.
arg : list, optional
List of arguments. Defaults to None.
Returns
-------
psij.JobSpec
PSI/J job specification.
"""
spec = self.psij.JobSpec()
spec.executable = cmd
spec.arguments = arg

return spec

def make_job(self, spec, attributes):
"""
Create a PSI/J job.
Parameters
----------
spec : psij.JobSpec
PSI/J job specification.
attributes : any
Job attributes.
Returns
-------
psij.Job
PSI/J job.
"""
job = self.psij.Job()
job.spec = spec
return job

async def exec_psij(self, runnable, rerun=False):
"""
Run a task (coroutine wrapper).
Raises
------
Exception
If stderr is not empty.
Returns
-------
None
"""
import pickle
from pathlib import Path

jex = self.psij.JobExecutor.get_instance(self.subtype)
absolute_path = Path(__file__).parent

if isinstance(runnable, TaskBase):
cache_dir = runnable.cache_dir
file_path = cache_dir / "runnable_function.pkl"
with open(file_path, "wb") as file:
pickle.dump(runnable._run, file)
func_path = absolute_path / "run_pickled.py"
spec = self.make_spec("python", [func_path, file_path])
else: # it could be tuple that includes pickle files with tasks and inputs
cache_dir = runnable[-1].cache_dir
file_path_1 = cache_dir / "taskmain.pkl"
file_path_2 = cache_dir / "ind.pkl"
ind, task_main_pkl, task_orig = runnable
with open(file_path_1, "wb") as file:
pickle.dump(task_main_pkl, file)
with open(file_path_2, "wb") as file:
pickle.dump(ind, file)
func_path = absolute_path / "run_pickled.py"
spec = self.make_spec(
"python",
[
func_path,
file_path_1,
file_path_2,
],
)

if rerun:
spec.arguments.append("--rerun")

spec.stdout_path = cache_dir / "demo.stdout"
spec.stderr_path = cache_dir / "demo.stderr"

job = self.make_job(spec, None)
jex.submit(job)
job.wait()

if spec.stderr_path.stat().st_size > 0:
with open(spec.stderr_path, "r") as stderr_file:
stderr_contents = stderr_file.read()
raise Exception(
f"stderr_path '{spec.stderr_path}' is not empty. Contents:\n{stderr_contents}"
)

return

def close(self):
"""Finalize the internal pool of tasks."""
pass


WORKERS = {
"serial": SerialWorker,
"cf": ConcurrentFuturesWorker,
"slurm": SlurmWorker,
"dask": DaskWorker,
"sge": SGEWorker,
**{
"psij-" + subtype: lambda subtype=subtype: PsijWorker(subtype=subtype)
for subtype in ["local", "slurm"]
},
}
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ classifiers = [
dynamic = ["version"]

[project.optional-dependencies]
psij = [
"psij-python",
]
dask = [
"dask",
"distributed",
Expand Down

0 comments on commit 0e3d33c

Please sign in to comment.