Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve LISI multiprocessing specification #301

Merged
merged 17 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scib/knn_graph/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.o
LuckyMD marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion scib/knn_graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
The function `knn_graph.cpp` was written by Thomas Neumann and adapted by Maren Büttner for graph-LISI.
Manual compilation:
```
$ g++ -std=c++11 -O3 knn_graph.cpp -o knn_graph.o
g++ -std=c++11 -O3 knn_graph.cpp -o knn_graph.o
```
A more modern version with `C++17` features is planned.
6 changes: 3 additions & 3 deletions scib/knn_graph/knn_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ int main(int argc, char* argv[]) {
unsigned limit = matrix.getRowCount();
unsigned len_ch;
if (n_chunks <= 1){
n_chunks = 0;
n_chunks = 1;
len_ch = limit;
}
else{
len_ch = limit/n_chunks;
len_ch = limit / (n_chunks - 1);
mumichae marked this conversation as resolved.
Show resolved Hide resolved
}
//get percentage to which should be subsampled
unsigned sub = stoi(argv[5]);
Expand All @@ -384,7 +384,7 @@ int main(int argc, char* argv[]) {
unsigned lower;
unsigned upper;

for (unsigned n_ch = 0; n_ch <= n_chunks; ++n_ch){
for (unsigned n_ch = 0; n_ch < n_chunks; ++n_ch){
// Find the top k elements for all nodes. Computes the sum of all the weights, just to have some result to show
// write all neighbors and weights to two files
dist = output_prefix + "_distances_" + to_string(n_ch) + ".txt";
Expand Down
Binary file modified scib/knn_graph/knn_graph.o
Binary file not shown.
107 changes: 44 additions & 63 deletions scib/metrics/lisi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from deprecated import deprecated
from scipy.io import mmwrite

import scib
from ..exceptions import RLibraryNotFound
from ..utils import check_adata, check_batch

Expand Down Expand Up @@ -54,8 +55,7 @@ def ilisi_graph(
type_=None,
subsample=None,
scale=True,
multiprocessing=None,
nodes=None,
n_cores=1,
verbose=False
):
"""Integration LISI (iLISI) score
Expand All @@ -75,10 +75,7 @@ def ilisi_graph(
:param subsample: Percentage of observations (integer between 0 and 100)
to which lisi scoring should be subsampled
:param scale: scale output values between 0 and 1 (True/False)
:param multiprocessing: parallel computation of LISI scores, if None, no parallelisation
via multiprocessing is performed
:param nodes: number of nodes (i.e. CPUs to use for multiprocessing); ignored, if
multiprocessing is set to None
:param n_cores: number of cores (i.e. CPUs or CPU cores to use for multiprocessing)
:return: Median of iLISI scores per batch labels
"""

Expand All @@ -92,8 +89,7 @@ def ilisi_graph(
n_neighbors=k0,
perplexity=None,
subsample=subsample,
multiprocessing=multiprocessing,
nodes=nodes,
n_cores=n_cores,
verbose=verbose
)

Expand All @@ -115,8 +111,7 @@ def clisi_graph(
type_=None,
subsample=None,
scale=True,
multiprocessing=None,
nodes=None,
n_cores=1,
verbose=False
):
"""Cell-type LISI (cLISI) score
Expand All @@ -137,10 +132,7 @@ def clisi_graph(
:param subsample: Percentage of observations (integer between 0 and 100)
to which lisi scoring should be subsampled
:param scale: scale output values between 0 and 1 (True/False)
:param multiprocessing: parallel computation of LISI scores, if None, no parallelisation
via multiprocessing is performed
:param nodes: number of nodes (i.e. CPUs to use for multiprocessing); ignored, if
multiprocessing is set to None
:param n_cores: number of cores (i.e. CPUs or CPU cores to use for multiprocessing)
:return: Median of cLISI scores per cell type labels
"""

Expand All @@ -156,8 +148,7 @@ def clisi_graph(
n_neighbors=k0,
perplexity=None,
subsample=subsample,
multiprocessing=multiprocessing,
nodes=nodes,
n_cores=n_cores,
verbose=verbose
)

Expand Down Expand Up @@ -191,8 +182,7 @@ def lisi_graph_py(
n_neighbors=90,
perplexity=None,
subsample=None,
multiprocessing=None,
nodes=None,
n_cores=1,
verbose=False
):
"""
Expand All @@ -201,6 +191,9 @@ def lisi_graph_py(
By default, perplexity is chosen as 1/3 * number of nearest neighbours in the knn-graph.
"""

# use no more than the available cores
n_cores = max(1, min(n_cores, mp.cpu_count()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous implementation of the multiprocessing, I have taken all but one CPU for the process to be a little less greedy. But when I checked my previous implementation, it is apparently not doing what the comment said but taking all CPUs. So please ignore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I was adapting it to have an upper bound and initially used n_cpu-1, before realising that the code will only run in single core on github action, since the machine only has 2 cores. So the lower bound is redundant, will remove.


if 'neighbors' not in adata.uns:
raise AttributeError(f"key 'neighbors' not found. Please make sure that a " +
"kNN graph has been computed")
Expand Down Expand Up @@ -239,38 +232,32 @@ def lisi_graph_py(
print(connectivities.data[large_enough == False])
connectivities.data[large_enough == False] = 3e-308

# define number of chunks
n_chunks = 1

if multiprocessing is not None:
# set up multiprocessing
if nodes is None:
# take all but one CPU and 1 CPU, if there's only 1 CPU.
n_cpu = mp.cpu_count()
n_processes = np.max([n_cpu, np.ceil(n_cpu / 2)]).astype('int')
else:
n_processes = nodes
# update numbr of chunks
n_chunks = n_processes

# temporary file
tmpdir = tempfile.TemporaryDirectory(prefix="lisi_")
dir_path = tmpdir.name + '/'
mtx_file_path = dir_path + 'input.mtx'
print(mtx_file_path, dir_path)
prefix = tmpdir.name + '/graph_lisi'
mtx_file_path = prefix + '_input.mtx'

mmwrite(
mtx_file_path,
connectivities,
symmetry='general'
)
# call knn-graph computation in Cpp

root = pathlib.Path(__file__).parent.parent # get current root directory
root = pathlib.Path(scib.__file__).parent # get current root directory
cpp_file_path = root / 'knn_graph/knn_graph.o' # create POSIX path to file to execute compiled cpp-code
# comment: POSIX path needs to be converted to string - done below with 'as_posix()'
# create evenly split chunks if n_obs is divisible by n_chunks (doesn't really make sense on 2nd thought)
n_splits = n_chunks - 1
args_int = [cpp_file_path.as_posix(), mtx_file_path, dir_path, str(n_neighbors), str(n_splits), str(subset)]
args_int = [
cpp_file_path.as_posix(),
mtx_file_path,
prefix,
str(n_neighbors),
str(n_cores), # number of splits
str(subset)
]
if verbose:
print(f'call {" ".join(args_int)}')
try:
subprocess.run(args_int)
except Exception as e:
Expand All @@ -281,48 +268,42 @@ def lisi_graph_py(
if verbose:
print("LISI score estimation")

# do the simpson call
if multiprocessing is not None:
if n_cores > 1:

if verbose:
print(f"{n_processes} processes started.")
pool = mp.Pool(processes=n_processes)
count = np.arange(0, n_processes)
print(f"{n_cores} processes started.")
pool = mp.Pool(processes=n_cores)
chunk_no = np.arange(0, n_cores)

# create argument list for each worker
results = pool.starmap(
compute_simpson_index_graph,
zip(itertools.repeat(dir_path),
zip(
itertools.repeat(prefix),
itertools.repeat(batch),
itertools.repeat(n_batches),
itertools.repeat(n_neighbors),
itertools.repeat(perplexity),
count)
chunk_no
)
)
pool.close()
pool.join()

simpson_est_batch = 1 / np.concatenate(results)
simpson_estimate_batch = np.concatenate(results)

else:
simpson_estimate_batch = compute_simpson_index_graph(
input_path=dir_path,
file_prefix=prefix,
batch_labels=batch,
n_batches=n_batches,
perplexity=perplexity,
n_neighbors=n_neighbors,
chunk_no=None
n_neighbors=n_neighbors
)
simpson_est_batch = 1 / simpson_estimate_batch

tmpdir.cleanup()

# extract results
d = {batch_key: simpson_est_batch}

lisi_estimate = pd.DataFrame(data=d, index=np.arange(0, len(simpson_est_batch)))

return lisi_estimate
return 1 / simpson_estimate_batch


# LISI core functions
Expand Down Expand Up @@ -402,7 +383,7 @@ def compute_simpson_index(


def compute_simpson_index_graph(
input_path=None,
file_prefix=None,
batch_labels=None,
n_batches=None,
n_neighbors=90,
Expand All @@ -413,7 +394,7 @@ def compute_simpson_index_graph(
"""
Simpson index of batch labels subset by group.

:param input_path: file_path to pre-computed index and distance files
:param file_prefix: file_path to pre-computed index and distance files
:param batch_labels: a vector of length n_cells with batch info
:param n_batches: number of unique batch labels
:param n_neighbors: number of nearest neighbors
Expand All @@ -422,24 +403,24 @@ def compute_simpson_index_graph(
:param tol: a tolerance for testing effective neighborhood size
:returns: the simpson index for the neighborhood of each cell
"""
index_file = file_prefix + '_indices_' + str(chunk_no) + '.txt'
distance_file = file_prefix + '_distances_' + str(chunk_no) + '.txt'

# initialize
P = np.zeros(n_neighbors)
logU = np.log(perplexity)

if chunk_no is None:
chunk_no = 0
# check if the target file is not empty
if os.stat(input_path + '_indices_' + str(chunk_no) + '.txt').st_size == 0:
if os.stat(index_file).st_size == 0:
print("File has no entries. Doing nothing.")
lists = np.zeros(0)
return lists

# read distances and indices with nan value handling
indices = pd.read_table(input_path + '_indices_' + str(chunk_no) + '.txt', index_col=0, header=None, sep=',')
indices = pd.read_table(index_file, index_col=0, header=None, sep=',')
indices = indices.T

distances = pd.read_table(input_path + '_distances_' + str(chunk_no) + '.txt', index_col=0, header=None, sep=',')
distances = pd.read_table(distance_file, index_col=0, header=None, sep=',')
distances = distances.T

# get cell ids
Expand Down
6 changes: 4 additions & 2 deletions scib/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def metrics(
ilisi_=False,
clisi_=False,
subsample=0.5,
n_cores=1,
type_=None,
verbose=False,
):
Expand Down Expand Up @@ -293,6 +294,7 @@ def metrics(
whether to compute iLISI using :func:`~scib.metrics.ilisi_graph`
:param subsample:
subsample fraction for LISI scores
:param n_cores: number of cores to be used for LISI functions
:param `type_`:
one of 'full', 'embed' or 'knn' (used for kBET and LISI scores)
"""
Expand Down Expand Up @@ -456,7 +458,7 @@ def metrics(
type_=type_,
subsample=subsample * 100,
scale=True,
multiprocessing=True,
n_cores=n_cores,
verbose=verbose
)
else:
Expand All @@ -470,7 +472,7 @@ def metrics(
type_=type_,
subsample=subsample * 100,
scale=True,
multiprocessing=True,
n_cores=n_cores,
verbose=verbose
)
else:
Expand Down
12 changes: 12 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
import subprocess
import sys

from setuptools import setup

try:
cmd = 'g++ -std=c++11 -O3 scib/knn_graph/knn_graph.cpp -o scib/knn_graph/knn_graph.o'
sys.stdout.write('Compile knn_graph C++ code for LISI metric...\n')
sys.stdout.flush()
subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True, universal_newlines=True)
except subprocess.CalledProcessError as exc:
sys.stdout.write(f'Failed to compile knn_graph for LISI - skipping...\n{exc.returncode}\n{exc.output}')
sys.stdout.flush()

setup()
Loading