Add WaitGroup synchronization primitive#14167
Add WaitGroup synchronization primitive#14167straight-shoota merged 17 commits intocrystal-lang:masterfrom
WaitGroup synchronization primitive#14167Conversation
This is more efficient than creating a Channel(Nil) and looping to receive N messages: we don't need a queue, only a counter, and we can avoid spurious wakeups of the main fiber and resume it only once.
|
Note to self: this (or rather its MT equivalent) is also known as a latch in C++ and Java |
straight-shoota
left a comment
There was a problem hiding this comment.
I think we'll need a bunch more tests for this. E.g. for multiple #wait calls, fibers adding more fibers, #add with negative delta or #done called before #wait. The latter two could both result in @counter < 0 which is a relevant invariant to verify.
The Go implementation has some test cases that we could take inspiration from: https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/sync/waitgroup_test.go
|
As food for thought: https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/sync/waitgroup.go |
|
Go merges the counter (i32) and the number of waiters (u32) into a single atomic (u64) which is a neat idea, then uses a semaphore to suspend the goroutines. What I'm curious about is:
|
|
I'm afraid following on Go won't be possible because their implementation takes advantage of the atomics returning the new value (e.g. add to counter => returns new counter + waiters), but Crystal relies on LLVM atomics that always return the old value, which... is often pointless 😞 For example, to support |
| # the fiber will be resumed. | ||
| # | ||
| # Can be called from different fibers. | ||
| def wait : Nil |
There was a problem hiding this comment.
I like this donation very much! It will be very useful in many cases.
One proposal that probably can be done later on as a separate improvement - is to make the wait method compatible with select to support the following snippet:
select
when wg.wait
puts "All fibers done"
when timeout(X.seconds)
puts "Some fiber stuck"
endThere was a problem hiding this comment.
Or maybe just have wg.wait(timeout: Time::Span | Nil) ?
There was a problem hiding this comment.
@bararchy We're missing a generic mechanism for timeouts... but we could abstract how it's implemented for select so that could be doable.
That doesn't mean we can't also integrate with select: we could wait on channel(s) + waitgroup(s) + timeout. Now, I'm not sure how to do that.
There was a problem hiding this comment.
@ysbaddaden I think that @alexkutsan's idea is better, because then we don't need to handle a Timeout Exception in case that the Timeout happen, and instead handle it in select context which seems cleaner, like how channel works when calling "receive" etc..
So I think my idea is less clean tbh 😅
There was a problem hiding this comment.
I have a commit to support WaitGroup in select expressions.
The integration wasn't complex after I understood how SelectAction and SelectContext are working, but the current implementation is very isolated to Channel (on purpose). Maybe the integration is not a good idea, but if proves to be a good idea, we might want to extract the select logic from Channel to the Crystal namespace.
I'll open a pull request after this one is merged, so we can have a proper discussion.
There was a problem hiding this comment.
@ysbaddaden now that it's in and merged, are you planning to make the followup PR? 👁️
Co-authored-by: Johannes Müller <straightshoota@gmail.com>
Co-authored-by: Jason Frey <fryguy9@gmail.com>
|
|
Disables the stress test when interpreted as it takes forever to complete.
|
This failure consistently appears in #14122 as well. |
|
And in #14257. |
I can't reproduce the "can't resuming running fiber" anymore when I disable the thread specs. I think we likely need actual support from the interpreter to start threads in the interpreted code.
Co-authored-by: Sijawusz Pur Rahnama <sija@sija.pl>
|
At the cost of using a The performance implications depend on whether in practice waitgroups will be protecting "large" operations (i.e. the time each thread spends maintaining the waitgroup is not dominant, and each cas op is very likely to succeed) or "small" operations (i.e. the thread spends most of it's time maintaining the waitgroup, and each cas op is likely to fail). My intuition is that it's the former and the performance implications of using a cas loop would be tiny. Apologies for bringing this up at a "late" stage in the PR, this didn't occur to me until now. |
|
@RX14 Interesting. I'm not concerned about performance (it's still lock free so it's fine to me), but:
Or am I missing something? |
|
Thanks @RX14. I'm noticing more edge cases. For example we could reach a negative number (raises) then a concurrent fiber would increment back to a positive number and fail to raise because the new counter is positive, also impacting the resumed waiting fibers that may continue despite the waitgroup being left in an invalid state. I'll likely to add a CAS loop, just not saturating at zero, but keep the negative number to detect the invalid state. |
It's now impossible for `#add` to increment a negative counter back into a positive one. `#wait` now checks for negative counter in addition to zero counter right after grabbing the lock.
|
I eased out corner edges:
I'm a bit torn about the last one: the situation can happen when the counter reached zero, enqueued waiters, and continued to increment, which is invalid, yet there is a race condition when reusing a WaitGroup with at least 2 waiters: fiber A reuses the WaitGroup (i.e. increments) before fiber B resumes (positive counter -> raise). Oh, the race condition would also trigger with a negative counter (lower probability but could happen), so the problem is reusing the object before all waiters are properly resumed. Ah, the joys of writing a synchronization primitive |
|
My highest concern is deadlocks: any condition where the counter remains on zero but fails to resume a waiter. There are race conditions when the waitgroup is used weirdly, but they do not cause a wrong count or a deadlock so can fail in a better way (raising). |
|
AFAIK it should now be impossible to fail to wake the waiting fibers or leave the waitgroup in a confusing state: the counter saturates to a negative number and can't return to a positive number anymore; waiting fibers are always resumed (once) when the counter reaches zero or below. I can't think of any scenario where we'd end up with a deadlock. I can still think of race conditions, though:
Depending on when the fibers are resumed, some may return successfully (zero counter) while some may raise (negative counter), yet at least one fiber will raise (the one decrementing the counter below zero), so the error shouldn't go unnoticed. |
|
I cannot wait to use this over some Channels that I have. |
|
Same! very excited for this one 🎉 |
WaitGroup synchronization primitive
|
This pull request has been mentioned on Crystal Forum. There might be relevant details there: https://forum.crystal-lang.org/t/crystal-and-parallelism/7716/7 |
This is more efficient than creating a
Channel(Nil)and looping to receive N messages: we don't need a queue, only a counter, and we can avoid spurious wake ups of the main fiber and resume it only once.See the documentation for examples and more details.