Use global semaphores for concurrency limits#18156
Conversation
8b427a2 to
599effa
Compare
|
|
||
| /// Concurrency limit settings. | ||
| #[derive(Copy, Clone, Debug)] | ||
| // TODO(konsti): We should find a pattern that doesn't require having both semaphores and counts. |
There was a problem hiding this comment.
Why do we require both semaphores and counts right now?
There was a problem hiding this comment.
There's rayon (no tokio semaphores) and for the tokio part there's a buffer_unordered that needs a number as limit.
There was a problem hiding this comment.
For Rayon we may be able to use the current tokio runtime handle to block_on the semaphore acquire call. This would block that rayon thread, but this would in effect mean that if there was something which had also spawned a rayon thread pool, there would only concurrency.whatever worth of actively working threads and the other threads would be sleeping. This isn't quite as good as having rayon limit its thread pool size somehow, but it's probably sufficient to ensure we don't do too much stuff at the same time.
Regarding buffer_unordered, I worry that it could lead to us having futures which aren't getting polled if we ever do async work for each complete item of work1. And I think we should just use a slightly less error prone abstraction for that case.
I'd say that fixing that would probably fall outside of this PR though.
Footnotes
-
@oconnor663 calls this "snoozing". There's also a good write-up of this case here: https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_battles_buffered_streams.html I'm not certain we don't run into this somewhere, but for the places I have looked at, we don't. ↩
There was a problem hiding this comment.
On that note, it feels weird to re-use this one type to hold both what the user configured and the semaphores. I imagine it would be a bit of churn but we could add a new type for the user facing side, avoid the "Debug" hack below, and simply have a function which creates a Concurrency (could be renamed if necessary) from a ConcurrencyConfig or whatever.
There was a problem hiding this comment.
For Rayon we may be able to use the current tokio runtime handle to block_on the semaphore acquire call. This would block that rayon thread, but this would in effect mean that if there was something which had also spawned a rayon thread pool, there would only
concurrency.whateverworth of actively working threads and the other threads would be sleeping. This isn't quite as good as having rayon limit its thread pool size somehow, but it's probably sufficient to ensure we don't do too much stuff at the same time.
My approach is kinda from the other end: As we mix different schedulers/runtimes and given that rayon has the global initialization, a bit of data duplication really isn't that bad. I'd add even more complexity to this type, if it reduces complexity in the sites where we run business logic.
Regarding
buffer_unordered, I worry that it could lead to us having futures which aren't getting polled if we ever do async work for each complete item of work1. And I think we should just use a slightly less error prone abstraction for that case.
Going of the original PR story cause it's interesting: Don't we avoid this problem by buffer_unordered being the last in the chain for us? As in, could Barbara have done this:
async fn do_work(database: &Database) {
let work = do_select(database, FIND_WORK_QUERY).await?;
let work_limit = Semaphore::new(1);
stream::iter(work)
.map(|item| async move {
let work_item = do_select(database, work_from_item(item)).await;
let _permit = work_limit.acquire().await.unwrap();
process_work_item(database, work_item).await;
})
.buffered(5)
.for_each(|()| std::future::ready(()))
.await;
}More practically, should we drop buffer_unordered and just collect all the futures into a FuturesUnordered directly, given that the limit is now internal, and ideally moves into the client eventually?
There was a problem hiding this comment.
My point regarding rayon was not really about duplication, just about the fact that if we use a semaphore in some places and we just use the raw number in other places, we're still doing more work than what we claim we want to do, if we ever end up using both concurrently. So if we set up a rayon thread pool with x threads and we have a semaphore with x slots and we do work in the thread pool and outside with the semaphore then we're doing up to 2x worth of work. If we then also use x in a third place where we use it for buffer_unordered we're doing up to 3x worth of work.
If the goal is to only ever do up to 1x worth of work, then we'd need everything to use the semaphore.
Regarding your suggestion regarding Barbara's code. It would fix the issue, but it wouldn't quite preserve the semantics, although it's maybe a better solution than what that page suggests.
The issue is that once a do_select completes we would ideally want to start another do_select immediately while we process_work_item on the one that just. In your version, the actual number of concurrent do_select will only be 4 while there is a process_work_item being progressed. In the extreme case, if we wanted to concurrently do up to 1 select and process up to one work item, we would actually only ever be doing one or the other.
However, there is still the question of how to make the "5" in that code (passed to buffered(5)) use a semaphore instead. One option is to create all the futures for all work items and then use the approach you have shown there, but this has the possibility of taking up quite a lot of memory if the futures have a lot of state. That's one of the reasons why .buffered(5) there is preferred as we're only ever holding 5 futures for the work tasks at a time.
It's a bit tricky to figure out how to limit the number of futures you have to keep around based on the number of semaphore slots you have. But we could limit the number of futures to the maximum concurrency we could ever have, and only progress them when we can acquire the semaphore... Something like:
let post_processing_limit = 1;
let post_processing_limit_semaphore = Semaphore::new(post_processing_limit);
stream::iter(work)
.map(|item| async move {
let result = {
let _permit = concurrency_limit_semaphore.acquire().await.unwrap();
do_work_on(item).await
};
let _permit = post_processing_limit_semaphore.acquire().await.unwrap();
post_process(result).await;
})
.buffer_unordered(concurrency_limit)
.for_each(|()| std::future::ready(()))
.await;
So I guess we could with some caveats use buffered_unordered after all, but if we do use it, we should wrap it up in some abstraction which doesn't require us to keep writing something like the above everywhere. It's probably a good solution for the time being. We could even do concurrency_limit + post_processing_limit in the buffer_unordered to solve the issue I mentioned above with us doing less work than we intend.
There was a problem hiding this comment.
My point regarding rayon was not really about duplication, just about the fact that if we use a semaphore in some places and we just use the raw number in other places, we're still doing more work than what we claim we want to do, if we ever end up using both concurrently. So if we set up a rayon thread pool with x threads and we have a semaphore with x slots and we do work in the thread pool and outside with the semaphore then we're doing up to 2x worth of work. If we then also use x in a third place where we use it for buffer_unordered we're doing up to 3x worth of work.
If the goal is to only ever do up to 1x worth of work, then we'd need everything to use the semaphore.
I see! In our case installs are rayon, and only installs (and unzip, but that's also install except for legacy source dists) use rayon, so there's no semaphore for rayon.
[...]
So I guess we could with some caveats use
buffered_unorderedafter all, but if we do use it, we should wrap it up in some abstraction which doesn't require us to keep writing something like the above everywhere. It's probably a good solution for the time being. We could even doconcurrency_limit + post_processing_limitin thebuffer_unorderedto solve the issue I mentioned above with us doing less work than we intend.
Thanks for the explanation!
|
|
||
| /// Concurrency limit settings. | ||
| #[derive(Copy, Clone, Debug)] | ||
| // TODO(konsti): We should find a pattern that doesn't require having both semaphores and counts. |
There was a problem hiding this comment.
For Rayon we may be able to use the current tokio runtime handle to block_on the semaphore acquire call. This would block that rayon thread, but this would in effect mean that if there was something which had also spawned a rayon thread pool, there would only concurrency.whatever worth of actively working threads and the other threads would be sleeping. This isn't quite as good as having rayon limit its thread pool size somehow, but it's probably sufficient to ensure we don't do too much stuff at the same time.
Regarding buffer_unordered, I worry that it could lead to us having futures which aren't getting polled if we ever do async work for each complete item of work1. And I think we should just use a slightly less error prone abstraction for that case.
I'd say that fixing that would probably fall outside of this PR though.
Footnotes
-
@oconnor663 calls this "snoozing". There's also a good write-up of this case here: https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_battles_buffered_streams.html I'm not certain we don't run into this somewhere, but for the places I have looked at, we don't. ↩
|
|
||
| /// Concurrency limit settings. | ||
| #[derive(Copy, Clone, Debug)] | ||
| // TODO(konsti): We should find a pattern that doesn't require having both semaphores and counts. |
There was a problem hiding this comment.
On that note, it feels weird to re-use this one type to hold both what the user configured and the semaphores. I imagine it would be a bit of churn but we could add a new type for the user facing side, avoid the "Debug" hack below, and simply have a function which creates a Concurrency (could be renamed if necessary) from a ConcurrencyConfig or whatever.
599effa to
1369a7b
Compare
Avoid problems such as #15307, follow-up to #18054. See also #17633, for which this should be helpful.