-
-
Notifications
You must be signed in to change notification settings - Fork 8
RFC 0003: WaitGroup #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
cecf0ec
rfc-wait-group
ysbaddaden b7b3cc1
Rename 0003-wait-group.md to text/0003-wait-group.md
ysbaddaden efea7cf
Fix: implementation PR link
ysbaddaden 2aca313
Add #spawn(&) + Pond shard reference
ysbaddaden 7d8b36c
Fix: reaches "below zero" not "zero"!
ysbaddaden 2d3c495
Add details about possible correctness error
ysbaddaden e7e563f
Fix: not undefined behavior
ysbaddaden 34f5b43
Remove undefined behavior unresolved question
ysbaddaden f926264
Resume fibers + raise when counter is negative
ysbaddaden File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| - Feature Name: `wait_group` | ||
| - Start Date: 2024-02-06 | ||
| - RFC PR: [crystal-lang/rfcs#3](https://github.com/crystal-lang/rfcs/pull/3) | ||
| - Implementation PR: [crystal-lang/crystal#14167](https://github.com/crystal-lang/crystal/pull/14167) | ||
|
|
||
| # Summary | ||
|
|
||
| Provide a mechanism to wait on the execution of a set of operations distributed to a set of fibers. | ||
|
|
||
| # Motivation | ||
|
|
||
| Applications currently rely on Channel(Nil) to implement this: | ||
|
|
||
| ```crystal | ||
| chan = Channel(Nil).new(256) | ||
|
|
||
| 256.times do |i| | ||
| spawn do | ||
| sliced_operation(i) | ||
| ensure | ||
| chan.write(nil) | ||
| end | ||
| end | ||
|
|
||
| 256.times { channel.receive } | ||
| ``` | ||
|
|
||
| In the above example, the main fiber will be resumed 256 times and the nil value be sent and received 256 times in the channel queue. Neither of these are necessary. | ||
|
|
||
| # Guide-level explanation | ||
|
|
||
| Introduce a new WaitGroup class that would keep a counter of how many fibers to wait for, each fiber would report when they're done, and the main fiber only be resumed once all fibers are done. | ||
|
|
||
| All methods can be called concurrently as well as in parallel (so the type must be thread-safe), and there may be multiple fibers waiting on the same WaitGroup at the same time. | ||
|
|
||
| The following rules must be respected: | ||
|
|
||
| 1. the counter must be incremented before it can be decremented; | ||
| 2. the counter must be incremented before a fiber can wait. | ||
|
|
||
| The main usage is very close to how we'd use a Channel(Nil), except that we resume the main fiber once (not N times) and we don't pass any value to a queue (less allocations, less moving data). The intent is also more clear: a fiber is waiting, a fiber reports that it terminated. | ||
|
|
||
| WaitGroup would also allow scenarios that aren't possible with Channel(Nil): | ||
|
|
||
| - Mutable counter: a WaitGroup may be modified at any time (but always _before_ the fiber calls `#done`) to increment or decrement the counter. The waiting fibers don't need to know about these changes: they will wait until all the execution is done. | ||
|
|
||
| - Signaling fibers: multiple fibers can wait on a WaitGroup, so we can signal a set of fibers at once. For example have a set of fibers wait before starting execution. | ||
|
|
||
| # Reference-level explanation | ||
|
|
||
| The proposed API: | ||
|
|
||
| ```crystal | ||
| class WaitGroup | ||
| def initialize(counter = 0) | ||
|
|
||
| # Increments the counter by *n* (decrements if n < 0). | ||
| # Resumes pending fibers when the counter reaches 0. | ||
| # Resumes pending fibers and raises RuntimeError if the counter reaches below 0. | ||
| def add(n : Int) : Nil | ||
|
|
||
| # Decrements the counter by 1. | ||
| # Resumes pending fibers when the counter reaches 0. | ||
| # Resumes pending fibers and raises RuntimeError if the counter reaches below 0. | ||
| def done : Nil | ||
| add(-1) | ||
| end | ||
|
|
||
| # Increments the counter by 1. | ||
| # Spawns a fiber to execute the given block, eventually decrementing the counter by 1. | ||
| # Returns the fiber. | ||
| def spawn(**args, &) : Fiber | ||
|
|
||
| # Blocks the current fiber until the counter reaches 0. | ||
| # A fiber must be resumed once, and only once. | ||
| # Raises RuntimeError if the counter reached below 0. | ||
| def wait : Nil | ||
| end | ||
| ``` | ||
|
|
||
| All methods can be called concurrently as well as in parallel (so the type must be thread-safe), and there may be multiple fibers waiting on the same WaitGroup at the same time. | ||
|
|
||
| The following example usage is very close to how we'd use a Channel, except that we resume the main fiber once (not 256 times) and we don't pass any value to a queue. | ||
|
|
||
| ```crystal | ||
| def sliced_operation(wg, i) | ||
| wg.add(32) | ||
|
|
||
| 32.times do |j| | ||
| spawn do | ||
| sub_sliced_operation(i, j) | ||
| ensure | ||
| wg.done | ||
| end | ||
| end | ||
| end | ||
|
|
||
| wg = WaitGroup.new(16) | ||
|
|
||
| 16.times do |i| | ||
| spawn do | ||
| prepare_slice(i) | ||
| sliced_operation(wg, i) | ||
| ensure | ||
| wg.done | ||
| end | ||
| end | ||
|
|
||
| wg.wait | ||
| ``` | ||
|
|
||
| # Drawbacks | ||
|
|
||
| We introduce a new synchronization primitive to fix an issue that could be non-existent with a different concurrency pattern (i.e. structured concurrency). | ||
|
|
||
| # Rationale and alternatives | ||
|
|
||
| Structured concurrency, where descendant fibers can't outlive their direct parent, could achieve the same behavior of the initial scenario (waiting on fibers), possibly obsoleting the proposed WaitGroup object. | ||
|
|
||
| The proposed WaitGroup type would still have some advantages: it can signal fibers, can wait on arbitrary fibers (albeit breaking the principle of structured concurrency), and at worst be a building block for waiting on said descendant fibers. | ||
ysbaddaden marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Prior art | ||
|
|
||
| Go has the sync.WaitGroup type. Java has the CountDownLatch class. Both behave in a similar way as the proposed solution. | ||
|
|
||
| The [Earl](https://www.shardbox.org/shards/earl) shard uses a WaitGroup type in its Supervisor and Pool classes to wait on the child fibers it spawned. | ||
| The [Pond](https://github.com/GrottoPress/pond) shard implements a nursery-like spawner with a waiting mecanism. | ||
|
|
||
| # Correctness | ||
|
|
||
| ## Dynamic increments | ||
|
|
||
| The following example exhibits a situation where the loop that increments the counter may sometimes yield the current fiber, leading some fibers to call `#done` before the wait group has been fully incremented. With MT and work stealing the fibers may be resumed in parallel, even without yield. | ||
|
|
||
| ```crystal | ||
| wg = WaitGroup.new | ||
| 16.times do | ||
| wg.add(1) | ||
| spawn { wg.done } | ||
| do_sometimes { Fiber.yield } | ||
| end | ||
| wg.wait | ||
| ``` | ||
|
|
||
| By the time the current fiber calls `#wait` we'll have incremented the counter 16 times and decremented it another 16 times; we always increment before we decrement, so we'll never reach a negative counter (that would raise). When the fiber calls `#wait` the counter may be within 0 and 16. If zero the `#wait` method returns immediately, otherwise it suspends the current fiber. | ||
ysbaddaden marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ## Dynamic increment & concurrent waiter | ||
|
|
||
| The following program exhibits a situation where a waiter will be resumed early: | ||
|
|
||
| ```crystal | ||
| wg = WaitGroup.new | ||
|
|
||
| spawn do | ||
| wg.wait | ||
| do_something_after_completion | ||
| end | ||
|
|
||
| 16.times do | ||
| wg.add(1) | ||
| spawn { wg.done } | ||
| do_sometimes { Fiber.yield } | ||
| end | ||
| ``` | ||
|
|
||
| The behavior of the loop is identical to the previous example: the counter may reach zero multiple times. The difference is that a concurrent fiber will wait for completion, which is acceptable, yet that fiber is enqueued first, can be resumed at any time and call `#wait` concurrently to the current fiber incrementing the counter. If the counter reaches zero early, the waiting fiber will be resumed early :boom: | ||
|
|
||
| The execution of fibers is, by design, undeterministic: we don't know when they will be executed, and the waiting fiber may be resumed before or while other others increment or decrement the counter. As such, this case can be considered to break the "must increment before we wait" rule. The proper usage is to spawn the waiting fiber after the loop, or to statically set the counter beforehand (`WaitGroup.new(16)`). Concurrent fibers may still increment the counter, as long as they do so before they call `#done`. | ||
|
|
||
| # Unresolved questions | ||
|
|
||
| None. | ||
|
|
||
| # Future possibilities | ||
|
|
||
| WaitGroup may eventually be used to implement higher level constructs, for example structured concurrency, or Erlang-like supervisors. It might also be integrated into `select` expressions to wait alongside channels and timeouts. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.