diff --git a/spec/std/fiber/execution_context/parallel_spec.cr b/spec/std/fiber/execution_context/parallel_spec.cr index f049073b88a7..f41259450d77 100644 --- a/spec/std/fiber/execution_context/parallel_spec.cr +++ b/spec/std/fiber/execution_context/parallel_spec.cr @@ -1,5 +1,6 @@ {% skip_file unless flag?(:execution_context) %} require "spec" +require "wait_group" describe Fiber::ExecutionContext::Parallel do it ".new" do @@ -41,4 +42,43 @@ describe Fiber::ExecutionContext::Parallel do Fiber::ExecutionContext::Parallel.new("test", size: 5..1) end end + + it "#resize" do + ctx = Fiber::ExecutionContext::Parallel.new("ctx", 1) + running = Atomic(Bool).new(true) + wg = WaitGroup.new + + 10.times do + wg.add(1) + + ctx.spawn do + while running.get(:relaxed) + sleep(10.microseconds) + end + ensure + wg.done + end + end + + # it grows + ctx.resize(4) + ctx.capacity.should eq(4) + + # it shrinks + ctx.resize(2) + ctx.capacity.should eq(2) + + # it doesn't change + ctx.resize(2) + ctx.capacity.should eq(2) + + 10.times do + n = rand(1..4) + ctx.resize(n) + ctx.capacity.should eq(n) + end + + running.set(false) + wg.wait + end end diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr index 3930bf2f07a6..87d9566e0e8c 100644 --- a/spec/std/fiber/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -71,6 +71,32 @@ describe Fiber::ExecutionContext::Runnables do end end + describe "#drain" do + it "drains the local queue into the global queue" do + fibers = 6.times.map { |i| new_fake_fiber("f#{i}") }.to_a + + # local enqueue + overflow + g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = Fiber::ExecutionContext::Runnables(6).new(g) + + # empty + r.drain + g.size.should eq(0) + + # full + fibers.each { |f| r.push(f) } + r.drain + r.shift?.should be_nil + g.size.should eq(6) + + # refill half (1 pop + 2 grab) and drain again + g.unsafe_grab?(r, divisor: 1) + r.drain + r.shift?.should be_nil + g.size.should eq(5) + end + end + describe "#bulk_push" do it "fills the local queue" do l = Fiber::List.new diff --git a/src/fiber/execution_context/parallel.cr b/src/fiber/execution_context/parallel.cr index 2052ff6588ed..1fa32eed5cd1 100644 --- a/src/fiber/execution_context/parallel.cr +++ b/src/fiber/execution_context/parallel.cr @@ -69,7 +69,6 @@ module Fiber::ExecutionContext @parked = Atomic(Int32).new(0) @spinning = Atomic(Int32).new(0) - @capacity : Int32 # :nodoc: protected def self.default(maximum : Int32) : self @@ -102,12 +101,12 @@ module Fiber::ExecutionContext @condition = Thread::ConditionVariable.new @global_queue = GlobalQueue.new(@mutex) - @schedulers = Array(Scheduler).new(@capacity) - @threads = Array(Thread).new(@capacity) + @schedulers = Array(Scheduler).new(capacity) + @threads = Array(Thread).new(capacity) @rng = Random::PCG32.new - start_schedulers + start_schedulers(capacity) @threads << hijack_current_thread(@schedulers.first) if hijack ExecutionContext.execution_contexts.push(self) @@ -120,7 +119,7 @@ module Fiber::ExecutionContext # The maximum number of threads that can be started. def capacity : Int32 - @capacity + @schedulers.size end # :nodoc: @@ -140,7 +139,7 @@ module Fiber::ExecutionContext # OPTIMIZE: consider storing schedulers to an array-like object that would # use an atomic/fence to make sure that @size can only be incremented # *after* the value has been written to @buffer. - private def start_schedulers + private def start_schedulers(capacity) capacity.times do |index| @schedulers << Scheduler.new(self, "#{@name}-#{index}") end @@ -176,6 +175,71 @@ module Fiber::ExecutionContext end end + # Resizes the context to the new *maximum* parallelism. + # + # The new *maximum* can grow, in which case more schedulers are created to + # eventually increase the parallelism. + # + # The new *maximum* can also shrink, in which case the overflow schedulers + # are removed and told to shutdown immediately. The actual shutdown is + # cooperative, so running schedulers won't stop until their current fiber + # tries to switch to another fiber. + def resize(maximum : Int32) : Nil + raise ArgumentError.new("Parallelism can't be less than one.") if maximum < 1 + removed_schedulers = nil + + @mutex.synchronize do + # can run in parallel to #steal that dereferences @schedulers (once) + # without locking the mutex, so we dup the schedulers, mutate the copy, + # and eventually assign the copy as @schedulers; this way #steal can + # safely access the array (never mutated). + new_capacity = maximum + old_threads = @threads + old_schedulers = @schedulers + old_capacity = capacity + + if new_capacity > old_capacity + @schedulers = Array(Scheduler).new(new_capacity) do |index| + old_schedulers[index]? || Scheduler.new(self, "#{@name}-#{index}") + end + threads = Array(Thread).new(new_capacity) + old_threads.each { |thread| threads << thread } + @threads = threads + elsif new_capacity < old_capacity + # tell the overflow schedulers to shutdown + removed_schedulers = old_schedulers[new_capacity..] + removed_schedulers.each(&.shutdown!) + + # resize + @schedulers = old_schedulers[0...new_capacity] + @threads = old_threads[0...new_capacity] + + # reset @parked counter (we wake all parked threads) so they can + # shutdown (if told to): + woken_threads = @parked.get(:relaxed) + @parked.set(0, :relaxed) + + # update @spinning prior to unpark threads; we use acquire release + # semantics to make sure that all the above stores are visible before + # the following wakeup calls (maybe not needed, but let's err on the + # safe side) + @spinning.add(woken_threads, :acquire_release) + + # wake every waiting thread: + @condition.broadcast + @event_loop.interrupt + end + end + + return unless removed_schedulers + + # drain the local queues of removed schedulers since they're no longer + # available for stealing + removed_schedulers.each do |scheduler| + scheduler.@runnables.drain + end + end + # :nodoc: def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread @@ -200,11 +264,12 @@ module Fiber::ExecutionContext protected def steal(& : Scheduler ->) : Nil return if capacity == 1 + schedulers = @schedulers i = @rng.next_int - n = @schedulers.size + n = schedulers.size n.times do |j| - if scheduler = @schedulers[(i &+ j) % n]? + if scheduler = schedulers[(i &+ j) % n]? yield scheduler end end @@ -271,8 +336,8 @@ module Fiber::ExecutionContext # we must also decrement the number of parked threads because another # thread could lock the mutex and increment @spinning again before the # signaled thread is resumed - spinning = @spinning.add(1, :acquire_release) - parked = @parked.sub(1, :acquire_release) + @spinning.add(1, :acquire_release) + @parked.sub(1, :acquire_release) @condition.signal end @@ -282,11 +347,11 @@ module Fiber::ExecutionContext # check if we can start another thread; no need for atomics, the values # shall be rather stable over time and we check them again inside the # mutex - return if @threads.size == capacity + return if @threads.size >= capacity @mutex.synchronize do index = @threads.size - return if index == capacity # check again + return if index >= capacity # check again @threads << start_thread(@schedulers[index]) end diff --git a/src/fiber/execution_context/parallel/scheduler.cr b/src/fiber/execution_context/parallel/scheduler.cr index cf8df4c74bdd..b91d96cc31ac 100644 --- a/src/fiber/execution_context/parallel/scheduler.cr +++ b/src/fiber/execution_context/parallel/scheduler.cr @@ -29,6 +29,7 @@ module Fiber::ExecutionContext @spinning = false @waiting = false @parked = false + @shutdown = false protected def initialize(@execution_context, @name) @global_queue = @execution_context.global_queue @@ -36,6 +37,10 @@ module Fiber::ExecutionContext @event_loop = @execution_context.event_loop end + protected def shutdown! : Nil + @shutdown = true + end + # :nodoc: def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber raise RuntimeError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread @@ -86,6 +91,8 @@ module Fiber::ExecutionContext end private def quick_dequeue? : Fiber? + return if @shutdown + # every once in a while: dequeue from global queue to avoid two fibers # constantly respawing each other to completely occupy the local queue if (@tick &+= 1) % 61 == 0 @@ -121,8 +128,21 @@ module Fiber::ExecutionContext Crystal.trace :sched, "started" loop do + if @shutdown + spin_stop + @runnables.drain + + # we may have been the last running scheduler, waiting on the event + # loop while there are pending events for example; let's resume a + # scheduler to take our place + @execution_context.wake_scheduler + + Crystal.trace :sched, "shutdown" + break + end + if fiber = find_next_runnable - spin_stop if @spinning + spin_stop resume fiber else # the event loop enqueued a fiber (or was interrupted) or the @@ -145,6 +165,8 @@ module Fiber::ExecutionContext # nothing to do: start spinning spinning do + return if @shutdown + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) if @execution_context.lock_evloop? { @event_loop.run(pointerof(list), blocking: false) } @@ -189,10 +211,12 @@ module Fiber::ExecutionContext # loop: park the thread until another scheduler or another context # enqueues a fiber @execution_context.park_thread do + # don't park the thread when told to shutdown + return if @shutdown + # by the time we acquire the lock, another thread may have enqueued # fiber(s) and already tried to wakeup a thread (race) so we must # check again; we don't check the scheduler's local queue (it's empty) - yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size) yield try_steal? diff --git a/src/fiber/execution_context/runnables.cr b/src/fiber/execution_context/runnables.cr index cc8be5322498..674461a03402 100644 --- a/src/fiber/execution_context/runnables.cr +++ b/src/fiber/execution_context/runnables.cr @@ -75,25 +75,54 @@ module Fiber::ExecutionContext # first, try to grab half of the fibers from local queue batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile - n.times do |i| - batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] - end - _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + _, success = try_grab(batch.to_unsafe, head, n) return false unless success - # append fiber to the batch + # append fiber to the batch and push to global queue batch.to_unsafe[n] = fiber + push_to_global_queue(batch.to_unsafe, n &+ 1) + true + end - # link the fibers + # Transfers every fiber in the local runnables queue to the global queue. + # This will grab the global lock. + # + # Can be executed by any scheduler. + def drain : Nil + batch = uninitialized Fiber[N] + n = 0 + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = (tail &- head) + return if n == 0 # queue is empty + + # try to grab everything from local queue + head, success = try_grab(batch.to_unsafe, head, n) + break if success + end + + push_to_global_queue(batch.to_unsafe, n) + end + + private def try_grab(batch, head, n) n.times do |i| - batch.to_unsafe[i].list_next = batch.to_unsafe[i &+ 1] + batch[i] = @buffer.to_unsafe[(head &+ i) % N] end - list = Fiber::List.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + end - # now put the batch on global queue (grabs the global lock) - @global_queue.bulk_push(pointerof(list)) + private def push_to_global_queue(batch, n) + # link the fibers + (n &- 1).times do |i| + batch[i].list_next = batch[i &+ 1] + end + list = Fiber::List.new(batch[0], batch[n &- 1], size: n.to_i32) - true + # and put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(list)) end # Tries to enqueue all the fibers in *list* into the local queue. If the