Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulate_for_sbi not saturating CPU #1175

Closed
janko-petkovic opened this issue Jun 15, 2024 · 11 comments · Fixed by #1188
Closed

simulate_for_sbi not saturating CPU #1175

janko-petkovic opened this issue Jun 15, 2024 · 11 comments · Fixed by #1188
Labels
question Further information is requested

Comments

@janko-petkovic
Copy link
Contributor

janko-petkovic commented Jun 15, 2024

Dear devs,

I have run into this unexpected behaviour while running the sbi.inference.simulate_for_sbi method.

It appears that, in a multi-processing scenario, the single core saturation is capped at a certain value that decreeses in function of the number of simulations to be produced. This inevitably leads to increased simulation times.

I have tried to circumvent the sbi.inference.simulate_for_sbi method both my manually implementing multi-processing (with the straightforward Python multiprocessing.Pool) and with the p_map method from the p_tqdm library. The latter has the advantage of being extremely easy to use, and of natively supporting tqdm execution bars. These two options correctly saturate all the CPUs involved in the computation.

I wonder if this behaviour is coming from the choice of joblib as multiprocessing library, and if a switch to p_tqdm could improve the performance of the SBI library.

Below, I provide some code to test this behaviour. The requirements are the source version of sbi as well as the PIP version of p_tqdm. The code contains three benchmarking methods (vanilla Pool, p_map and simulate_for_sbi) as well as two handles for the number of processes (workers) to be used and the number of simulations to be generated.

By looking at the ETA, the runtime difference between simulate_for_sbi and p_map is negligible for a low simulation number (e.g. 100) but starts increasing together with it, arriving at a factor 10 difference in case of a num_simulations equal to 1M.

I hope this report can be helpful, especially since the simulation runtime represents a bottleneck in many common usecases.

from multiprocessing import Pool
from time import time
from datetime import datetime, timedelta

import numpy as np
import torch

from p_tqdm import p_map
from sbi.utils import BoxUniform
from sbi.utils.user_input_checks import process_simulator, process_prior
from sbi.inference import simulate_for_sbi



def timeit(func):
    '''Decorator method for printing the execution time'''

    def wrapper(*args, **kwargs):

        print(f'\n > Generation started:\t{datetime.now().strftime("%H:%M:%S")}')
        beg = time()

        func(*args, **kwargs)

        end = time()
        print(f' > Generation complete:\t{datetime.now().strftime("%H:%M:%S")}')

        tds = str(timedelta(seconds=end-beg)).split(':')
        print(f' > Total runtime:\t{tds[0]}h {tds[1]}min {tds[2]:.4}s')

    return wrapper



def simulation(theta):
    '''Some generic simulation function.
    You can change this to study, e.g. the impact of numpy's multithreading
    on the code's multiprocessing - np.linalg.eig
    '''

    # matrix = np.random.rand(200,200)/theta
    # eigval, eigvec = np.linalg.eig(matrix)
    for _ in range(100):
        np.random.randn(10000)

    return 1



@timeit
def run_simulations_pool(prior, num_simulations, num_processes):
    '''Generates the joint using python's native Pool multiprocessing'''

    theta_low = prior.base_dist.low.cpu().numpy()
    theta_high = prior.base_dist.high.cpu().numpy()
    theta_range = theta_high - theta_low

    thetas = np.random.uniform(size=(num_simulations,1)) * theta_range + theta_low

    with Pool(num_processes) as p:
        x = p.map(simulation, thetas)


@timeit
def run_simulations_p_map(prior, num_simulations, num_processes):
    '''Generates the joint using p_map from the p_tqdm library'''

    theta_low = prior.base_dist.low.cpu().numpy()
    theta_high = prior.base_dist.high.cpu().numpy()
    theta_range = theta_high - theta_low

    thetas = np.random.uniform(size=(num_simulations,1)) * theta_range + theta_low

    x = p_map(simulation, thetas, num_cpus=num_processes)


@timeit
def run_simulations_sim_for_sbi(prior, num_simulations, num_procsses):
    '''Generates the joint using sbi.inference.simulate_for_sbi'''

    prior, num_parameters, prior_returns_numpy = process_prior(prior)
    simulator_wrapper = process_simulator(simulation, prior, prior_returns_numpy)

    theta, x = simulate_for_sbi(simulator_wrapper, 
                                prior, 
                                num_simulations, 
                                num_procsses)


if __name__ == '__main__':

    prior = BoxUniform(low=torch.tensor([0.]), 
                       high=torch.tensor([10.]), 
                       device='cpu')

    num_simulations = 1000000
    num_processes = 16

    # Uncomment the benchmark that you want to run.
    # The the native pool benchmark is just as a ground truth to test.

    # print('Benchmarking: sbi.inference.simulate_for_sbi')
    # run_simulations_sim_for_sbi(prior, num_simulations, num_processes)

    # print('Benchmarking: p_map (p_tqdm)')
    # run_simulations_p_map(prior, num_simulations, num_processes)

    # print('Benchmarking: native Pool')
    # run_simulations_pool(prior, num_simulations, num_processes)
@janko-petkovic janko-petkovic added the question Further information is requested label Jun 15, 2024
@janko-petkovic janko-petkovic changed the title simulate_for_sbi not saturating CPU simulate_for_sbi not saturating CPU Jun 15, 2024
@janfb
Copy link
Contributor

janfb commented Jun 17, 2024

Dear @janko-petkovic

Thanks a lot for looking into this and proving the benchmark. I can confirm your findings. This is indeed a very interesting behavior of simulate_for_sbi. The "embarrassingly parallel" for loops we used here seem to be embarrassingly slow 😬

I think the reason could be the progress bar wrappers we use to show tqdm progress bars. They can are blocking the underlying processes, leading to the reduced CPU usage. Combining joblib with tqdm has been an issue for some time. But there has been some improvements to joblib recently which now make it possible to combine more elegantly (and performant). It's now possible to return the Parallel object as a generator which can be passed to tqdm directly, see joblib/joblib#972 (comment).

Indeed, if I add the following minimal joblib function to the benchmark I get the same performance as with p_tqdm:

@timeit
def run_simulations_joblib(prior, num_simulations, num_processes):
    '''Generates the joint using joblib'''

    theta_low = prior.base_dist.low.cpu().numpy()
    theta_high = prior.base_dist.high.cpu().numpy()
    theta_range = theta_high - theta_low

    thetas = np.random.uniform(size=(num_simulations, 1)) * theta_range + theta_low
    x = [
        r
        for r in tqdm(
            Parallel(return_as="generator", n_jobs=num_processes)(
                delayed(simulation)(theta) for theta in thetas
            ),
            total=num_simulations,
        )
    ]

Results:
image

@janfb
Copy link
Contributor

janfb commented Jun 17, 2024

Looking into the code for simulate_for_sbi and simulate_in_batches, I also see that it really needs some refactoring. I will make a PR. If you like you can contribute the joblib fix there as well.

@plcrodrigues
Copy link
Contributor

@tomMoral might have a word or two to say about this?

@janko-petkovic
Copy link
Contributor Author

janko-petkovic commented Jun 17, 2024

Very glad to see this! Also, very interestingly, the fix you proposed seems to work better than p_tqdm and the current simulate_for_sbi in the case which

def simulation(theta):
    '''Some generic simulation function.
    You can change this to study, e.g. the impact of numpy's multithreading
    on the code's multiprocessing - np.linalg.eig'''

    matrix = np.random.rand(200,200)/theta
    eigval, eigvec = np.linalg.eig(matrix)

    return 1

where the vectorization of np.linalg.eig is known to sometimes interfere with the higher level multiprocessing. Good that joblib takes care of this and that the fix can be implemented somewhat easily!

Looking into the code for simulate_for_sbi and simulate_in_batches, I also see that it really needs some refactoring. I will make a PR. If you like you can contribute the joblib fix there as well.

Sure, I'd love to. How can we organize?

@janfb
Copy link
Contributor

janfb commented Jun 17, 2024

@tomMoral might have a word or two to say about this?

yes please!

@janfb
Copy link
Contributor

janfb commented Jun 17, 2024

Sure, I'd love to. How can we organize?

Please follow the instructions here: https://github.com/sbi-dev/sbi/blob/main/docs/docs/contribute.md
In short: create a fork, work on your own feature branch, install pre-commit to run hooks for code style conventions, then make a PR. Let me know if anything is unclear.

Regarding the refactoring: I would

  • get rid of simulate_in_batches entirely and just make a call to joblib wrapped in tqdm directly in simulate_for_sbi
  • make sure to wrap the simulator with a seed when a seed is passed (see simulate_in_batches)
  • make sure to pass batches of theta if batch_size>1 (see simulate_in_batches)

One strange thing I noticed and haven't understood yet: In the benchmark, you sampled thetas using a numpy.random call inside each function. You could have just sampled from the prior with prior.sample((num_simulations,).

However, if I do so, the following call to joblib is much slower 🤔 , e.g., 600 it / s vs. 150 it / s. Can you confirm this?

@janko-petkovic
Copy link
Contributor Author

janko-petkovic commented Jun 17, 2024

Great! I will start working on it then.

Concerning your observeation: yes, I can confirm that, and it seems to happen because, again, the CPU saturation is reduced. From what I see, it is connected to the fact that prior.sample outputs a torch.Tensor. Indeed, with

thetas = prior.sample((num_simulations,)).tolist()
# [... joblib code ...]

everything goes back to normal, and che CPU returns to full saturation, with equal runtimes across the board. Why torch conflicts with joblib I have, unfortunately, no idea yet 😆

@janfb
Copy link
Contributor

janfb commented Jun 18, 2024

Here is another short benchmark to check how the simulator changes the parallelization with joblib. It's really brute force, but I wanted a clear picture for the strange behavior from above.

I test four different simulators. One that

  • takes and returns numpy arrays and uses numpy.random.randn,
  • takes and returns numpy arrays but internally uses torch.randn
  • and the same two combinations for torch arguments and returns.

additionally I also test whether passing a list of args to joblib changes the timings.

Here are the results:
image

To summarize: It seems that torch.randn is just much faster than np.random.randn and that the type of the inputs or returns does not matter.

ChatGPT even created a histogram from it, which shows it quite clearly: argument type does not matter, but torch vs numpy does, i.e., internally using torch is much faster.
image

import time

import numpy as np
import pytest
import torch
from joblib import Parallel, delayed
from torch import Tensor
from tqdm import tqdm


def numpy_simulator(theta: np.ndarray) -> np.ndarray:
    if isinstance(theta, Tensor):
        theta = theta.numpy()
    for _ in range(100):
        np.random.randn(10000) + np.random.randn(*theta.shape)

    return theta


def numpy_torch_simulator(theta: np.ndarray) -> np.ndarray:
    if isinstance(theta, np.ndarray):
        theta = torch.from_numpy(theta)
    for _ in range(100):
        torch.randn(10000) + torch.randn_like(theta)

    return theta.numpy()


def torch_simulator(theta: Tensor) -> Tensor:
    if isinstance(theta, np.ndarray):
        theta = torch.from_numpy(theta)
    for _ in range(100):
        torch.randn(10000) + torch.randn_like(theta)

    return theta


def torch_numpy_simulator(theta: Tensor) -> Tensor:
    if isinstance(theta, Tensor):
        theta = theta.numpy()
    for _ in range(100):
        np.random.randn(10000) + np.random.randn(*theta.shape)

    return torch.from_numpy(theta)


@pytest.mark.parametrize("type", ["numpy", "torch", "list"])
@pytest.mark.parametrize(
    "simulator",
    [numpy_simulator, numpy_torch_simulator, torch_simulator, torch_numpy_simulator],
)
def test_joblib_benchmark(simulator, type):
    num_simulations = 10000
    if type == "torch":
        theta = torch.distributions.Uniform(-1, 1).sample((num_simulations,))
    elif type == "numpy":
        theta = np.random.uniform(size=(num_simulations, 1))
    elif type == "list":
        theta = [np.random.uniform(size=(1,)) for _ in range(num_simulations)]
    num_processes = 10

    tic = time.time()
    x = [
        r
        for r in tqdm(
            Parallel(return_as="generator", n_jobs=num_processes)(  # type: ignore
                delayed(simulator)(batch) for batch in theta
            ),
            total=num_simulations,
            disable=not True,
        )
    ]
    toc_joblib = time.time() - tic
    # print the time for given simulator
    print(f"{simulator.__name__}; arg type: {type}: {toc_joblib:.2f} seconds \n")

To run it, do pytest benchmark_file.py::test_joblib_benchmark -s

@janko-petkovic
Copy link
Contributor Author

I am able to reproduce what you find on my machine; indeed, torch.rand seems to be much faster.
One important thing however: 10000 runs do not incur in any unexpected behaviour; to observe it clearly, 1M runs are necessary. In this latter case, one can see that:

  • theta of type numpy and list produces benchmarks with 300 - 1000 it/s, with the correct difference between torch.rand and np.random.rand
  • theta of type torch procudes benchmarks around 100 it/s, together with a reduced CPU saturation (on linux one can see it directly on htop while running the tests).

I think that this behaviour does not depend on torch.rand but on some "conflict" between joblib and torch. I still need to read more into this however.

@janfb
Copy link
Contributor

janfb commented Jun 18, 2024

OK, thanks for checking this. It would be interesting to get @tomMoral take on this.

also, this https://pytorch.org/docs/stable/notes/multiprocessing.html might be a relevant read.

@janko-petkovic
Copy link
Contributor Author

Hello guys. A quick update on the state of the issue. After discussing with @janfb, we decided to proceed as follows:

  1. a first PR with an additional test will be submitted (tests/cpu_saturation_test.py). This test is meant to let the tester observe the CPU usage (via the process manager or htop) while a large number of simulations is running.
  2. a second PR, with the actual refactoring and hotfix of simulate_for_sbi. After (if) accepted, one should run again the cpu_saturation_test.py and compare the results, hopefully noticing considerable improvement.

I will now do the first PR. Please let me know if you have any concerns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants