Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ class Thread
getter! execution_context : Fiber::ExecutionContext

# :nodoc:
property! scheduler : Fiber::ExecutionContext::Scheduler
def execution_context=(@execution_context : Fiber::ExecutionContext?)
end

# :nodoc:
getter! scheduler : Fiber::ExecutionContext::Scheduler

# :nodoc:
def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext
main_fiber.execution_context = execution_context
def scheduler=(@scheduler : Fiber::ExecutionContext::Scheduler?)
end

# When a fiber terminates we can't release its stack until we swap context
Expand Down
8 changes: 8 additions & 0 deletions src/crystal/tracing.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ module Crystal
write value.name || '?'
end

def write(value : Thread) : Nil
{% if flag?(:linux) %}
write Pointer(Void).new(value.@system_handle)
{% else %}
write value.@system_handle
{% end %}
end

{% if flag?(:execution_context) %}
def write(value : Fiber::ExecutionContext) : Nil
write value.name
Expand Down
7 changes: 7 additions & 0 deletions src/fiber/execution_context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,14 @@ require "./execution_context/*"
# ```
@[Experimental]
module Fiber::ExecutionContext
@@thread_pool : ThreadPool?
@@default : ExecutionContext::Parallel?

# :nodoc:
def self.thread_pool : ThreadPool
@@thread_pool.not_nil!("expected thread pool to have been setup")
end

# Returns the default `ExecutionContext` for the process, automatically
# started when the program started.
#
Expand All @@ -84,6 +90,7 @@ module Fiber::ExecutionContext

# :nodoc:
def self.init_default_context : Nil
@@thread_pool = ThreadPool.new
@@default = Parallel.default(1)
@@monitor = Monitor.new
end
Expand Down
27 changes: 18 additions & 9 deletions src/fiber/execution_context/isolated.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ module Fiber::ExecutionContext
@mutex = Thread::Mutex.new
@condition = Thread::ConditionVariable.new
protected getter thread : Thread
@main_fiber : Fiber
protected getter main_fiber : Fiber

# :nodoc:
getter(event_loop : Crystal::EventLoop) do
Expand All @@ -66,19 +66,23 @@ module Fiber::ExecutionContext
def initialize(@name : String, @spawn_context : ExecutionContext = ExecutionContext.default, &@func : ->)
@thread = uninitialized Thread
@main_fiber = uninitialized Fiber
@main_fiber = Fiber.new(@name, allocate_stack, self) { run }
@thread = start_thread
ExecutionContext.execution_contexts.push(self)
end

private def allocate_stack : Stack
# no stack pool: we directly allocate a stack; it will be automatically
# released when the thread is returned to the thread pool
pointer = Crystal::System::Fiber.allocate_stack(StackPool::STACK_SIZE, protect: true)
Stack.new(pointer, StackPool::STACK_SIZE, reusable: true)
end

private def start_thread : Thread
Thread.new(name: @name) do |thread|
@thread = thread
thread.execution_context = self
thread.scheduler = self
thread.main_fiber.name = @name
@main_fiber = thread.main_fiber
run
end
ExecutionContext.thread_pool.checkout(self)
end

protected def thread=(@thread)
end

# :nodoc:
Expand Down Expand Up @@ -136,6 +140,11 @@ module Fiber::ExecutionContext
protected def reschedule : Nil
Crystal.trace :sched, "reschedule"

if @main_fiber.dead?
ExecutionContext.thread_pool.checkin
return # actually unreachable
end

if event_loop = @event_loop
wait_for(event_loop)
else
Expand Down
66 changes: 26 additions & 40 deletions src/fiber/execution_context/parallel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ module Fiber::ExecutionContext
@spinning = Atomic(Int32).new(0)

# :nodoc:
#
# Starts the default execution context. There can be only one for the whole
# process. Must be called from the main thread's main fiber; associates the
# current thread and fiber to the created execution context.
protected def self.default(maximum : Int32) : self
new("DEFAULT", maximum, hijack: true)
Fiber.current.execution_context = new("DEFAULT", maximum, hijack: true)
end

# Starts a `Parallel` context with a *maximum* parallelism. The context
Expand All @@ -98,28 +102,27 @@ module Fiber::ExecutionContext
new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false)
end

protected def initialize(@name : String, @capacity : Int32, hijack : Bool)
raise ArgumentError.new("Parallelism can't be less than one.") if @capacity < 1
protected def initialize(@name : String, capacity : Int32, hijack : Bool)
raise ArgumentError.new("Parallelism can't be less than one.") if capacity < 1

@mutex = Thread::Mutex.new
@condition = Thread::ConditionVariable.new

@global_queue = GlobalQueue.new(@mutex)
@schedulers = Array(Scheduler).new(capacity)
@threads = Array(Thread).new(capacity)
@event_loop = Crystal::EventLoop.create(capacity)

@started = hijack ? 1 : 0
@rng = Random::PCG32.new

start_schedulers(capacity)
@threads << hijack_current_thread(@schedulers.first) if hijack
hijack_current_thread(@schedulers.first) if hijack

ExecutionContext.execution_contexts.push(self)
end

# The number of threads that have been started.
def size : Int32
@threads.size
@started
end

# The maximum number of threads that can be started.
Expand All @@ -134,12 +137,11 @@ module Fiber::ExecutionContext

# Starts all schedulers at once.
#
# We could lazily initialize them as needed, like we do for threads, which
# would be safe as long as we only mutate when the mutex is locked... but
# unlike @threads, we do iterate the schedulers in #steal without locking
# the mutex (for obvious reasons) and there are no guarantees that the new
# schedulers.@size will be written after the scheduler has been written to
# the array's buffer.
# We could lazily initialize them as needed, would be safe as long as we
# only mutate when the mutex is locked, but we iterate the schedulers in
# #steal without locking the mutex (for obvious reasons) and there are no
# guarantees that the new schedulers.@size will be written after the
# scheduler has been written to the array's buffer.
#
# OPTIMIZE: consider storing schedulers to an array-like object that would
# use an atomic/fence to make sure that @size can only be incremented
Expand All @@ -156,32 +158,18 @@ module Fiber::ExecutionContext

# Attaches *scheduler* to the current `Thread`, usually the process' main
# thread. Starts a `Fiber` to run the scheduler loop.
private def hijack_current_thread(scheduler) : Thread
private def hijack_current_thread(scheduler) : Nil
thread = Thread.current
thread.internal_name = scheduler.name
thread.execution_context = self
thread.scheduler = scheduler

scheduler.thread = thread
scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do
scheduler.run_loop
end

thread
end

# Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop
# directly in the thread's main `Fiber`.
private def start_thread(scheduler) : Thread
Thread.new(name: scheduler.name) do |thread|
thread.execution_context = self
thread.scheduler = scheduler

scheduler.thread = thread
scheduler.main_fiber = thread.main_fiber
scheduler.main_fiber.name = "#{scheduler.name}:loop"
scheduler.run_loop
end
private def start_thread(scheduler) : Nil
ExecutionContext.thread_pool.checkout(scheduler)
end

# Resizes the context to the new *maximum* parallelism.
Expand All @@ -203,25 +191,21 @@ module Fiber::ExecutionContext
# and eventually assign the copy as @schedulers; this way #steal can
# safely access the array (never mutated).
new_capacity = maximum
old_threads = @threads
old_schedulers = @schedulers
old_capacity = capacity

if new_capacity > old_capacity
@schedulers = Array(Scheduler).new(new_capacity) do |index|
old_schedulers[index]? || start_scheduler(index)
end
threads = Array(Thread).new(new_capacity)
old_threads.each { |thread| threads << thread }
@threads = threads
elsif new_capacity < old_capacity
# tell the overflow schedulers to shutdown
removed_schedulers = old_schedulers[new_capacity..]
removed_schedulers.each(&.shutdown!)

# resize
@schedulers = old_schedulers[0...new_capacity]
@threads = old_threads[0...new_capacity]
@started = new_capacity if @started > new_capacity

# reset @parked counter (we wake all parked threads) so they can
# shutdown (if told to):
Expand Down Expand Up @@ -294,6 +278,8 @@ module Fiber::ExecutionContext
Crystal.trace :sched, "park"
@parked.add(1, :acquire_release)

# TODO: detach the scheduler and return the thread back into ThreadPool
# instead
@condition.wait(@mutex)

# we don't decrement @parked because #wake_scheduler did
Expand Down Expand Up @@ -355,13 +341,13 @@ module Fiber::ExecutionContext
# check if we can start another thread; no need for atomics, the values
# shall be rather stable over time and we check them again inside the
# mutex
return if @threads.size >= capacity
return if @started >= capacity

@mutex.synchronize do
index = @threads.size
return if index >= capacity # check again

@threads << start_thread(@schedulers[index])
if (index = @started) < capacity
start_thread(@schedulers[index])
@started += 1
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion src/fiber/execution_context/parallel/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Fiber::ExecutionContext
# :nodoc:
property execution_context : Parallel
protected property! thread : Thread
protected property! main_fiber : Fiber
protected property main_fiber : Fiber

@global_queue : GlobalQueue
@runnables : Runnables(256)
Expand All @@ -35,6 +35,7 @@ module Fiber::ExecutionContext
@global_queue = @execution_context.global_queue
@runnables = Runnables(256).new(@global_queue)
@event_loop = @execution_context.event_loop
@main_fiber = Fiber.new("#{@name}:loop", @execution_context) { run_loop }
end

protected def shutdown! : Nil
Expand Down
Loading