diff --git a/spec/std/fiber/execution_context/parallel_spec.cr b/spec/std/fiber/execution_context/parallel_spec.cr index cb358577f453..f049073b88a7 100644 --- a/spec/std/fiber/execution_context/parallel_spec.cr +++ b/spec/std/fiber/execution_context/parallel_spec.cr @@ -7,14 +7,16 @@ describe Fiber::ExecutionContext::Parallel do mt.size.should eq(0) mt.capacity.should eq(2) - expect_raises(ArgumentError, "needs at least one thread") do + expect_raises(ArgumentError, "Parallelism can't be less than one") do Fiber::ExecutionContext::Parallel.new("test", maximum: -1) end - expect_raises(ArgumentError, "needs at least one thread") do + expect_raises(ArgumentError, "Parallelism can't be less than one") do Fiber::ExecutionContext::Parallel.new("test", maximum: 0) end + # the following are deprecated constructors + mt = Fiber::ExecutionContext::Parallel.new("test", size: 0..2) mt.size.should eq(0) mt.capacity.should eq(2) @@ -24,14 +26,14 @@ describe Fiber::ExecutionContext::Parallel do mt.capacity.should eq(4) mt = Fiber::ExecutionContext::Parallel.new("test", size: 1..5) - mt.size.should eq(1) + mt.size.should eq(0) mt.capacity.should eq(5) mt = Fiber::ExecutionContext::Parallel.new("test", size: 1...5) - mt.size.should eq(1) + mt.size.should eq(0) mt.capacity.should eq(4) - expect_raises(ArgumentError, "needs at least one thread") do + expect_raises(ArgumentError, "Parallelism can't be less than one.") do Fiber::ExecutionContext::Parallel.new("test", size: 0...1) end diff --git a/src/fiber/execution_context/parallel.cr b/src/fiber/execution_context/parallel.cr index 2e041003936f..2052ff6588ed 100644 --- a/src/fiber/execution_context/parallel.cr +++ b/src/fiber/execution_context/parallel.cr @@ -69,56 +69,46 @@ module Fiber::ExecutionContext @parked = Atomic(Int32).new(0) @spinning = Atomic(Int32).new(0) - @size : Range(Int32, Int32) + @capacity : Int32 # :nodoc: protected def self.default(maximum : Int32) : self - new("DEFAULT", 1..maximum, hijack: true) + new("DEFAULT", maximum, hijack: true) end - # Starts a context with a *maximum* number of threads. Threads aren't started - # right away but will be started as needed to increase parallelism up to the - # configured maximum. + # Starts a context with a *maximum* parallelism. The context starts with an + # initial parallelism of zero. It will grow to one when a fiber is spawned, + # then the actual parallelism will keep increasing and decreasing as needed, + # but will never go past the configured *maximum*. def self.new(name : String, maximum : Int32) : self - new(name, 0..maximum) + new(name, maximum, hijack: false) end - # Starts a context with a *maximum* number of threads. Threads aren't started - # right away but will be started as needed to increase parallelism up to the - # configured maximum. + @[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")] def self.new(name : String, size : Range(Nil, Int32)) : self - new(name, Range.new(0, size.end, size.exclusive?)) + new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end - # Starts a context with a minimum and maximum number of threads. Only the - # minimum number of threads will be started right away. The minimum can be 0 - # (or nil) in which case no threads will be started. More threads will be - # started as needed to increase parallelism up to the configured maximum. + @[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")] def self.new(name : String, size : Range(Int32, Int32)) : self - new(name, size, hijack: false) + raise ArgumentError.new("invalid range") if size.begin > size.end + new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end - protected def initialize(@name : String, size : Range(Int32, Int32), hijack : Bool) - @size = - if size.exclusive? - (size.begin)..(size.end - 1) - else - size - end - raise ArgumentError.new("#{self.class.name} needs at least one thread") if capacity < 1 - raise ArgumentError.new("#{self.class.name} invalid range") if @size.begin > @size.end + protected def initialize(@name : String, @capacity : Int32, hijack : Bool) + raise ArgumentError.new("Parallelism can't be less than one.") if @capacity < 1 @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new @global_queue = GlobalQueue.new(@mutex) - @schedulers = Array(Scheduler).new(capacity) - @threads = Array(Thread).new(capacity) + @schedulers = Array(Scheduler).new(@capacity) + @threads = Array(Thread).new(@capacity) @rng = Random::PCG32.new start_schedulers - start_initial_threads(hijack) + @threads << hijack_current_thread(@schedulers.first) if hijack ExecutionContext.execution_contexts.push(self) end @@ -130,7 +120,7 @@ module Fiber::ExecutionContext # The maximum number of threads that can be started. def capacity : Int32 - @size.end + @capacity end # :nodoc: @@ -156,19 +146,6 @@ module Fiber::ExecutionContext end end - private def start_initial_threads(hijack) - offset = 0 - - if hijack - @threads << hijack_current_thread(@schedulers[0]) - offset += 1 - end - - offset.upto(@size.begin - 1) do |index| - @threads << start_thread(@schedulers[index]) - end - end - # Attaches *scheduler* to the current `Thread`, usually the process' main # thread. Starts a `Fiber` to run the scheduler loop. private def hijack_current_thread(scheduler) : Thread @@ -221,7 +198,7 @@ module Fiber::ExecutionContext # Picks a scheduler at random then iterates all schedulers to try to steal # fibers from. protected def steal(& : Scheduler ->) : Nil - return if size == 1 + return if capacity == 1 i = @rng.next_int n = @schedulers.size