Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Adding a 'Thread Affinity' when using parallel features to control Thread usage by jobs #630

Open
fredericvauchelles opened this issue Jan 27, 2019 · 7 comments

Comments

@fredericvauchelles
Copy link

Motivation

To optimize thread usage, at the start of your application you would instantiate one thread per core and use work stealing to avoid thread context switch. This can fit perfectly with Rayon.

However, It is currently impossible to restrict the number of thread used for specific jobs in a pool.

Use case

In my application, I will have two kinds of jobs: short and long jobs.

Short jobs need to be processed in real time and can't stall, whereas long jobs are less constrained in time.

Currently, if I schedule a parallel iterator that execute a long job, it can be run on all the threads simultaneously and no other short jobs can be executed.

Proposal

  • When creating a ThreadPool, one can provide the ThreadAffinity per spawned thread.
  • When using a parallel feature, one can provide the ThreadAffinity to use per jobs.

Example of the API: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=08f720e91377bc8d6c823a69b10ef610

Highlights of the example:

  • Initialization of the pool with specific affinities
let pool = rayon::ThreadPoolBuilder::new()
	.num_threads(8)
	// Define affinity per thread
	.thread_affinity(|thread_index| {
	    if thread_index < 4 {
	        1 // Affinity bitmask for short job (user defined)
	    }
	    else {
	       	3  // Affinity bitmask for short and long job (user defined)
	    }
	})
	.build()
	.unwrap();
  • Setting an affinity when using parallel iterators
let values = vec![1; 1_000];
    
let sum_fast: i32 = values
    .par_iter(affinity: 1 /* AFFINITY_SHORT */)
    .map(|&i| i + 1)
    .sum();

Implementation suggestion

(This is the main spots I am thinking of, there will probably other place to update)

Updating registry::in_worker

  • If calling from a non worker thread, schedule the job for the provided affinity
  • If calling from a worker thread,
    • If affinity matches, execute the job
    • If affinity don't match, schedule the job for the provided affinity, while (job not completed) steal job for this worker affinity

Updating WorkerThread::take_local_job

  • The thread can only steal in queue for his affinity
  • Use a round robin strategy when multiple affinity are assigned to the worker

What do you think of it?

@cuviper
Copy link
Member

cuviper commented Feb 2, 2019

Short jobs need to be processed in real time and can't stall, whereas long jobs are less constrained in time.

Your short jobs still have the potential to stall among themselves - for example, when waiting to join the result from another thread. At a stall, that thread will try to steal other jobs. If the affinity allows that it might steal one of the long jobs, then you'd have a priority inversion, delaying the completion of the short job.

Similarly, if threads in the pool are already busy on a long job, they're not going to preempted when a new short job comes, so the short job won't have the full potential resources available.

We don't have direct support for priorities, nor any design to do so. I would suggest using separate ThreadPool instances, and you can use the start_handler to set OS-level priorities and CPU affinities as needed. Use explicit calls like pool.install(...) to run something on a particular pool.

@fredericvauchelles
Copy link
Author

@cuviper Thanks for the answer.

Concerning the resources usage, I think I should rephrase what I tried to do, I'll go with an example:
If I have 8 logical threads available, I want 6 threads to process only short jobs and 2 threads to process in order long jobs, and then short jobs.

I know that I will issue constantly a lot of short jobs but long jobs will be only issued sometimes. That way, short jobs will always be consumed by 6 to 8 threads and long jobs, if any, will be consumed at most by 2 threads.
It is ok on those 2 threads to have short jobs following a long jobs if there is no other long job to process, but not in the other 6.

Concerning using using different thread pools, that is the way I am going currently, before attempting any modification to rayon. But I have two issues about it:

  1. pool.install set the global thread pool, so all parallel iterator will run on the last thread pool installed, not the thread pool of the current worker
  2. (Maybe a minor optimization, but if possible, why not) Avoid thread context switch by having exactly one thread per logical core.

@nikomatsakis
Copy link
Member

@fredpointzero As @cuviper said, the way we intended to deal with this was via ThreadPool. But one of your comments suggests that there may be some confusion:

pool.install set the global thread pool, so all parallel iterator will run on the last thread pool installed, not the thread pool of the current worker

In fact, parallel iterators do use the thread-pool of the current worker. But what pool.install actually does is to change the current worker by executing the closure over in a (potentially different) thread-pool.

It might be useful to get a few more details about how your 'long, slow' jobs work -- are they parallel iterators? Is that something where you might want a "future" -- i.e., the ability to run the job asynchronously and then demand it later? Can you use the thread_pool.spawn feature instead, to "push" the job into the "slow" thread-pool?

@fredericvauchelles
Copy link
Author

Hi, @nikomatsakis thanks for the update. There was indeed a confusion about how install works.

A bit of context

I am currently building a realtime engine and therefore needs to be accurate with task scheduling.
The way my engine works is:

  1. You can schedule tasks (basically a closure)
  2. Tasks can be chained, in that case they will receive as input the output of previous tasks
  3. Tasks can be executed either on a Frame queue or a Long queue.

So, you end up with a graph of tasks to execute and identify clusters of tasks by queue.

The task contract

The contract I want to enforce, is:

  • whenever a task in Frame queue is scheduled, all tasks belonging to the same clusters must completed during this frame.
  • Long tasks don't have time constraints.

Current implementation

I am currently using two ThreadPool, one for the Frame queue, another for the Long queue.
There will always frame tasks running and they must have the highest throughtput possible. However, Long tasks are more unpredictable.

So my goal with this issue, is to maximize the usage of CPU cores while ensuring above contract.

Currently, my setup for a 8 core CPU is:
- 8 Frame workers and 2 Long workers.
- Main thread becomes a Frame worker (calls install on Frame ThreadPool)

Execution of tasks is as follow:
- while there is long tasks pending execution, spawn them on Long ThreadPool.
- while there is frame tasks pending execution, use a ParallelIterator to execute them (fork-join)

Implementation of each tasks will probably use itself ParallelIterator, thus it is important those are scheduled in the appriopriate pool, but this is the case already :).

Issue

If I want to push a bit further, I will want to avoid context switches for each thread and have 1 thread per core. But this implies that Frame tasks and Long tasks are competing for the same threads, and I don't know if this is possible.

(Although, I need to finish my implementation and profile it to understand what are the possible gain of removing context switches in that case.)

@fredericvauchelles
Copy link
Author

An update from my side about the implementation of my scheduling system.

I ended up dropping the "long task" scheduling feature: trying to mix short and long tasks scheduling in a single system is not worth the effort:

  • The code complexity dramatically increase
  • As the completion of long tasks is unknown, it forbids some computation to be statically performed and makes deterministic scheduling barely possible.

Finally, concerning my usage of rayon, I use it like this:

  • Deterministic statically defined scheduling for short tasks, I end up with a DAG to execute from start to end. Currently, I use a main loop with spawning and graph logics to execute the DAG, but I think a fork-join method is better suited. (But requires pre-RFC: rayon::yield_now #548).
  • Background jobs which will be only spawn, and I will probably use a future like API to interact with those tasks.
    Both jobs use different thread pool with a different amount of threads.

Although, this does not cancel the initial feature request: It would be nice to have somehow a single ThreadPool but a way to describe how threads would compete to take jobs.

Another way of thinking this is like:

  1. Instantiating a ThreadPool with N threads
  2. Creating a ThreadPoolView from this ThreadPool that can use N threads (priority 1, priority: lower is first to take jobs) (This view will be used for short jobs)
  3. Creating a ThreadPoolView from this ThreadPool that can use M threads (M < N) (priority 0) for long jobs.

I would fork-join short jobs on ThreadPoolView n°1 and spawn long jobs on ThreadPoolView n°2. In that way, I am sure that:

  1. Short jobs will take as many available threads as possible.
  2. There are always threads available for short jobs
  3. For long jobs, either there is a thread that can take long jobs, or there will be soon a threads that can take a long job.

Maybe this kind of API is better suited to fit with existing rayon API?

@l4l
Copy link

l4l commented Apr 21, 2022

After #636 it's possible to implement at least first part manually via spawn_handler:

ThreadPoolBuilder::new()
    .spawn_handler(|t| std::thread::spawn(|| {
        affinity::set_thread_affinity(match t.index() {
            0..4 => &[1],
            _    => &[3],
        });
        t.run();
    }))

Apparently the second part could be resolved via multiple ThreadPool's. May the issue be closed then?

@cuviper
Copy link
Member

cuviper commented Apr 25, 2022

@l4l a start_handler should be able to do that too, if you don't need control of the spawn itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants