Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: use joblib instead of multiprocessing #227

Merged
merged 7 commits into from
Jul 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 53 additions & 119 deletions fissa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
import functools
import glob
import itertools
import multiprocessing
import os.path
import sys
import warnings

import tqdm
from past.builtins import basestring

try:
from collections import abc
except ImportError:
import collections as abc

import numpy as np
import tqdm
from joblib import Parallel, delayed
from past.builtins import basestring
from scipy.io import savemat

from . import deltaf, extraction
Expand Down Expand Up @@ -221,17 +220,6 @@ def separate_trials(
return Xsep, Xmatch, Xmixmat, convergence


if sys.version_info < (3, 0):
# Define helper functions which are needed on Python 2.7, which does not
# have multiprocessing.Pool.starmap.

def _extract_wrapper(args):
return extract(*args)

def _separate_wrapper(args):
return separate_trials(*args)


class Experiment:
r"""
FISSA Experiment.
Expand Down Expand Up @@ -301,28 +289,30 @@ class Experiment:

.. versionadded:: 1.0.0

ncores_preparation : int or None, default=None
ncores_preparation : int or None, default=-1
The number of parallel subprocesses to use during the data
preparation steps of :meth:`separation_prep`.
These are ROI and neuropil subregion definitions, and extracting
raw signals from TIFFs.

If set to ``None`` (default), the number of processes used will
equal the number of threads on the machine. Note that this
behaviour can, especially for the data preparation step,
If set to ``None`` or ``-1`` (default), the number of processes used
will equal the number of threads on the machine.
If this is set to ``-2``, the number of processes used will be one less
than the number of threads on the machine; etc.
Note that this behaviour can, especially for the data preparation step,
be very memory-intensive.

ncores_separation : int or None, default=None
ncores_separation : int or None, default=-1
The number of parallel subprocesses to use during the signal
separation steps of :meth:`separate`.
The separation steps requires less memory per subprocess than
the preparation steps, and so can be often be set higher than
`ncores_preparation`.

If set to ``None`` (default), the number of processes used will
equal the number of threads on the machine. Note that this
behaviour can, especially for the data preparation step,
be very memory-intensive.
If set to ``None`` or ``-1`` (default), the number of processes used
will equal the number of threads on the machine.
If this is set to ``-2``, the number of processes used will be one less
than the number of threads on the machine; etc.

method : "nmf" or "ica", default="nmf"
Which blind source-separation method to use. Either ``"nmf"``
Expand Down Expand Up @@ -492,8 +482,8 @@ def __init__(
max_iter=20000,
tol=1e-4,
max_tries=1,
ncores_preparation=None,
ncores_separation=None,
ncores_preparation=-1,
ncores_separation=-1,
method="nmf",
lowmemory_mode=False,
datahandler=None,
Expand Down Expand Up @@ -761,55 +751,17 @@ def separation_prep(self, redo=False):
datahandler=self.datahandler,
)

# Check whether we should use multiprocessing
use_multiprocessing = (
self.ncores_preparation is None or self.ncores_preparation > 1
)

# check whether we should show progress bars
disable_progressbars = self.verbosity != 1

# Do the extraction
if use_multiprocessing and sys.version_info < (3, 0):
# define pool
pool = multiprocessing.Pool(self.ncores_preparation)
# run extraction
outputs = list(
pool.map(
_extract_wrapper,
tqdm.tqdm(
zip(
self.images,
self.rois,
itertools.repeat(self.nRegions, len(self.images)),
itertools.repeat(self.expansion, len(self.images)),
itertools.repeat(self.datahandler, len(self.images)),
),
total=self.nTrials,
desc="Extracting traces",
disable=disable_progressbars,
),
)
)
pool.close()
pool.join()

elif use_multiprocessing:
with multiprocessing.Pool(self.ncores_preparation) as pool:
# run extraction
outputs = list(
pool.starmap(
_extract_cfg,
tqdm.tqdm(
zip(self.images, self.rois),
total=self.nTrials,
desc="Extracting traces",
disable=disable_progressbars,
),
)
)
# Check how many workers to spawn.
# Map the behaviour of ncores=None to one job per CPU core, like for
# multiprocessing.Pool(processes=None). With joblib, this is
# joblib.Parallel(n_jobs=-1) instead.
n_jobs = -1 if self.ncores_preparation is None else self.ncores_preparation

else:
if 0 <= n_jobs <= 1:
# Don't use multiprocessing
outputs = [
_extract_cfg(*args)
for args in tqdm.tqdm(
Expand All @@ -819,6 +771,17 @@ def separation_prep(self, redo=False):
disable=disable_progressbars,
)
]
else:
# Use multiprocessing
outputs = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(_extract_cfg)(image, roi_stack)
for image, roi_stack in tqdm.tqdm(
zip(self.images, self.rois),
total=self.nTrials,
desc="Extracting traces",
disable=disable_progressbars,
)
)

# get number of cells
nCell = len(outputs[0][1])
Expand Down Expand Up @@ -955,58 +918,18 @@ def separate(self, redo_prep=False, redo_sep=False):
verbosity=self.verbosity - 2,
)

# Check whether we should use multiprocessing
use_multiprocessing = (
self.ncores_separation is None or self.ncores_separation > 1
)

# check whether we should show progress bars
disable_progressbars = self.verbosity != 1

# Check how many workers to spawn.
# Map the behaviour of ncores=None to one job per CPU core, like for
# multiprocessing.Pool(processes=None). With joblib, this is
# joblib.Parallel(n_jobs=-1) instead.
n_jobs = -1 if self.ncores_separation is None else self.ncores_separation

# Do the extraction
if use_multiprocessing and sys.version_info < (3, 0):
# define pool
pool = multiprocessing.Pool(self.ncores_separation)

# run separation
outputs = list(
pool.map(
_separate_wrapper,
tqdm.tqdm(
zip(
self.raw,
range(n_roi),
itertools.repeat(self.alpha, n_roi),
itertools.repeat(self.max_iter, n_roi),
itertools.repeat(self.tol, n_roi),
itertools.repeat(self.max_tries, n_roi),
itertools.repeat(self.method, n_roi),
itertools.repeat(self.verbosity, n_roi),
),
total=self.nCell,
desc="Separating data",
disable=disable_progressbars,
),
)
)
pool.close()
pool.join()

elif use_multiprocessing:
with multiprocessing.Pool(self.ncores_separation) as pool:
# run separation
outputs = list(
pool.starmap(
_separate_cfg,
tqdm.tqdm(
zip(self.raw, range(n_roi)),
total=self.nCell,
desc="Separating data",
disable=disable_progressbars,
),
)
)
else:
if 0 <= n_jobs <= 1:
# Don't use multiprocessing
outputs = [
_separate_cfg(X, roi_label=i)
for i, X in tqdm.tqdm(
Expand All @@ -1016,6 +939,17 @@ def separate(self, redo_prep=False, redo_sep=False):
disable=disable_progressbars,
)
]
else:
# Use multiprocessing
outputs = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(_separate_cfg)(X, i)
for i, X in tqdm.tqdm(
enumerate(self.raw),
total=self.nCell,
desc="Separating data",
disable=disable_progressbars,
)
)

# Define output shape as an array of objects shaped (n_roi, n_trial)
sep = np.empty((n_roi, n_trial), dtype=object)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
future>=0.16.0
joblib>=0.14.1
numpy>=1.13.0
Pillow>=4.3.0
read-roi>=1.5.0; python_version>='3.0'
Expand Down