Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions spec/std/fiber/execution_context/parallel_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ describe Fiber::ExecutionContext::Parallel do
mt.size.should eq(0)
mt.capacity.should eq(2)

expect_raises(ArgumentError, "needs at least one thread") do
expect_raises(ArgumentError, "Parallelism can't be less than one") do
Fiber::ExecutionContext::Parallel.new("test", maximum: -1)
end

expect_raises(ArgumentError, "needs at least one thread") do
expect_raises(ArgumentError, "Parallelism can't be less than one") do
Fiber::ExecutionContext::Parallel.new("test", maximum: 0)
end

# the following are deprecated constructors

mt = Fiber::ExecutionContext::Parallel.new("test", size: 0..2)
mt.size.should eq(0)
mt.capacity.should eq(2)
Expand All @@ -24,14 +26,14 @@ describe Fiber::ExecutionContext::Parallel do
mt.capacity.should eq(4)

mt = Fiber::ExecutionContext::Parallel.new("test", size: 1..5)
mt.size.should eq(1)
mt.size.should eq(0)
mt.capacity.should eq(5)

mt = Fiber::ExecutionContext::Parallel.new("test", size: 1...5)
mt.size.should eq(1)
mt.size.should eq(0)
mt.capacity.should eq(4)

expect_raises(ArgumentError, "needs at least one thread") do
expect_raises(ArgumentError, "Parallelism can't be less than one.") do
Fiber::ExecutionContext::Parallel.new("test", size: 0...1)
end

Expand Down
61 changes: 19 additions & 42 deletions src/fiber/execution_context/parallel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,46 @@ module Fiber::ExecutionContext

@parked = Atomic(Int32).new(0)
@spinning = Atomic(Int32).new(0)
@size : Range(Int32, Int32)
@capacity : Int32

# :nodoc:
protected def self.default(maximum : Int32) : self
new("DEFAULT", 1..maximum, hijack: true)
new("DEFAULT", maximum, hijack: true)
end

# Starts a context with a *maximum* number of threads. Threads aren't started
# right away but will be started as needed to increase parallelism up to the
# configured maximum.
# Starts a context with a *maximum* parallelism. The context starts with an
# initial parallelism of zero. It will grow to one when a fiber is spawned,
# then the actual parallelism will keep increasing and decreasing as needed,
# but will never go past the configured *maximum*.
def self.new(name : String, maximum : Int32) : self
new(name, 0..maximum)
new(name, maximum, hijack: false)
end

# Starts a context with a *maximum* number of threads. Threads aren't started
# right away but will be started as needed to increase parallelism up to the
# configured maximum.
@[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")]
def self.new(name : String, size : Range(Nil, Int32)) : self
new(name, Range.new(0, size.end, size.exclusive?))
new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false)
end

# Starts a context with a minimum and maximum number of threads. Only the
# minimum number of threads will be started right away. The minimum can be 0
# (or nil) in which case no threads will be started. More threads will be
# started as needed to increase parallelism up to the configured maximum.
@[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")]
def self.new(name : String, size : Range(Int32, Int32)) : self
new(name, size, hijack: false)
raise ArgumentError.new("invalid range") if size.begin > size.end
new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false)
end

protected def initialize(@name : String, size : Range(Int32, Int32), hijack : Bool)
@size =
if size.exclusive?
(size.begin)..(size.end - 1)
else
size
end
raise ArgumentError.new("#{self.class.name} needs at least one thread") if capacity < 1
raise ArgumentError.new("#{self.class.name} invalid range") if @size.begin > @size.end
protected def initialize(@name : String, @capacity : Int32, hijack : Bool)
raise ArgumentError.new("Parallelism can't be less than one.") if @capacity < 1

@mutex = Thread::Mutex.new
@condition = Thread::ConditionVariable.new

@global_queue = GlobalQueue.new(@mutex)
@schedulers = Array(Scheduler).new(capacity)
@threads = Array(Thread).new(capacity)
@schedulers = Array(Scheduler).new(@capacity)
@threads = Array(Thread).new(@capacity)

@rng = Random::PCG32.new

start_schedulers
start_initial_threads(hijack)
@threads << hijack_current_thread(@schedulers.first) if hijack

ExecutionContext.execution_contexts.push(self)
end
Expand All @@ -130,7 +120,7 @@ module Fiber::ExecutionContext

# The maximum number of threads that can be started.
def capacity : Int32
@size.end
@capacity
end

# :nodoc:
Expand All @@ -156,19 +146,6 @@ module Fiber::ExecutionContext
end
end

private def start_initial_threads(hijack)
offset = 0

if hijack
@threads << hijack_current_thread(@schedulers[0])
offset += 1
end

offset.upto(@size.begin - 1) do |index|
@threads << start_thread(@schedulers[index])
end
end

# Attaches *scheduler* to the current `Thread`, usually the process' main
# thread. Starts a `Fiber` to run the scheduler loop.
private def hijack_current_thread(scheduler) : Thread
Expand Down Expand Up @@ -221,7 +198,7 @@ module Fiber::ExecutionContext
# Picks a scheduler at random then iterates all schedulers to try to steal
# fibers from.
protected def steal(& : Scheduler ->) : Nil
return if size == 1
return if capacity == 1

i = @rng.next_int
n = @schedulers.size
Expand Down