diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 8272b3a84f38..06f8af461ba8 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -84,11 +84,14 @@ class Thread getter! execution_context : Fiber::ExecutionContext # :nodoc: - property! scheduler : Fiber::ExecutionContext::Scheduler + def execution_context=(@execution_context : Fiber::ExecutionContext?) + end + + # :nodoc: + getter! scheduler : Fiber::ExecutionContext::Scheduler # :nodoc: - def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext - main_fiber.execution_context = execution_context + def scheduler=(@scheduler : Fiber::ExecutionContext::Scheduler?) end # When a fiber terminates we can't release its stack until we swap context diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index 912a4bad171b..918d262b9a03 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -82,6 +82,14 @@ module Crystal write value.name || '?' end + def write(value : Thread) : Nil + {% if flag?(:linux) %} + write Pointer(Void).new(value.@system_handle) + {% else %} + write value.@system_handle + {% end %} + end + {% if flag?(:execution_context) %} def write(value : Fiber::ExecutionContext) : Nil write value.name diff --git a/src/fiber/execution_context.cr b/src/fiber/execution_context.cr index 8e18ac4bd018..795a07ae22ec 100644 --- a/src/fiber/execution_context.cr +++ b/src/fiber/execution_context.cr @@ -69,8 +69,14 @@ require "./execution_context/*" # ``` @[Experimental] module Fiber::ExecutionContext + @@thread_pool : ThreadPool? @@default : ExecutionContext::Parallel? + # :nodoc: + def self.thread_pool : ThreadPool + @@thread_pool.not_nil!("expected thread pool to have been setup") + end + # Returns the default `ExecutionContext` for the process, automatically # started when the program started. # @@ -84,6 +90,7 @@ module Fiber::ExecutionContext # :nodoc: def self.init_default_context : Nil + @@thread_pool = ThreadPool.new @@default = Parallel.default(1) @@monitor = Monitor.new end diff --git a/src/fiber/execution_context/isolated.cr b/src/fiber/execution_context/isolated.cr index 6dacf1966b02..96a4d8fcdf57 100644 --- a/src/fiber/execution_context/isolated.cr +++ b/src/fiber/execution_context/isolated.cr @@ -45,7 +45,7 @@ module Fiber::ExecutionContext @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new protected getter thread : Thread - @main_fiber : Fiber + protected getter main_fiber : Fiber # :nodoc: getter(event_loop : Crystal::EventLoop) do @@ -66,19 +66,23 @@ module Fiber::ExecutionContext def initialize(@name : String, @spawn_context : ExecutionContext = ExecutionContext.default, &@func : ->) @thread = uninitialized Thread @main_fiber = uninitialized Fiber + @main_fiber = Fiber.new(@name, allocate_stack, self) { run } @thread = start_thread ExecutionContext.execution_contexts.push(self) end + private def allocate_stack : Stack + # no stack pool: we directly allocate a stack; it will be automatically + # released when the thread is returned to the thread pool + pointer = Crystal::System::Fiber.allocate_stack(StackPool::STACK_SIZE, protect: true) + Stack.new(pointer, StackPool::STACK_SIZE, reusable: true) + end + private def start_thread : Thread - Thread.new(name: @name) do |thread| - @thread = thread - thread.execution_context = self - thread.scheduler = self - thread.main_fiber.name = @name - @main_fiber = thread.main_fiber - run - end + ExecutionContext.thread_pool.checkout(self) + end + + protected def thread=(@thread) end # :nodoc: @@ -136,6 +140,11 @@ module Fiber::ExecutionContext protected def reschedule : Nil Crystal.trace :sched, "reschedule" + if @main_fiber.dead? + ExecutionContext.thread_pool.checkin + return # actually unreachable + end + if event_loop = @event_loop wait_for(event_loop) else diff --git a/src/fiber/execution_context/parallel.cr b/src/fiber/execution_context/parallel.cr index 51f50a37fd13..56ed82665363 100644 --- a/src/fiber/execution_context/parallel.cr +++ b/src/fiber/execution_context/parallel.cr @@ -75,8 +75,12 @@ module Fiber::ExecutionContext @spinning = Atomic(Int32).new(0) # :nodoc: + # + # Starts the default execution context. There can be only one for the whole + # process. Must be called from the main thread's main fiber; associates the + # current thread and fiber to the created execution context. protected def self.default(maximum : Int32) : self - new("DEFAULT", maximum, hijack: true) + Fiber.current.execution_context = new("DEFAULT", maximum, hijack: true) end # Starts a `Parallel` context with a *maximum* parallelism. The context @@ -98,28 +102,27 @@ module Fiber::ExecutionContext new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end - protected def initialize(@name : String, @capacity : Int32, hijack : Bool) - raise ArgumentError.new("Parallelism can't be less than one.") if @capacity < 1 + protected def initialize(@name : String, capacity : Int32, hijack : Bool) + raise ArgumentError.new("Parallelism can't be less than one.") if capacity < 1 @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new @global_queue = GlobalQueue.new(@mutex) @schedulers = Array(Scheduler).new(capacity) - @threads = Array(Thread).new(capacity) @event_loop = Crystal::EventLoop.create(capacity) - + @started = hijack ? 1 : 0 @rng = Random::PCG32.new start_schedulers(capacity) - @threads << hijack_current_thread(@schedulers.first) if hijack + hijack_current_thread(@schedulers.first) if hijack ExecutionContext.execution_contexts.push(self) end # The number of threads that have been started. def size : Int32 - @threads.size + @started end # The maximum number of threads that can be started. @@ -134,12 +137,11 @@ module Fiber::ExecutionContext # Starts all schedulers at once. # - # We could lazily initialize them as needed, like we do for threads, which - # would be safe as long as we only mutate when the mutex is locked... but - # unlike @threads, we do iterate the schedulers in #steal without locking - # the mutex (for obvious reasons) and there are no guarantees that the new - # schedulers.@size will be written after the scheduler has been written to - # the array's buffer. + # We could lazily initialize them as needed, would be safe as long as we + # only mutate when the mutex is locked, but we iterate the schedulers in + # #steal without locking the mutex (for obvious reasons) and there are no + # guarantees that the new schedulers.@size will be written after the + # scheduler has been written to the array's buffer. # # OPTIMIZE: consider storing schedulers to an array-like object that would # use an atomic/fence to make sure that @size can only be incremented @@ -156,32 +158,18 @@ module Fiber::ExecutionContext # Attaches *scheduler* to the current `Thread`, usually the process' main # thread. Starts a `Fiber` to run the scheduler loop. - private def hijack_current_thread(scheduler) : Thread + private def hijack_current_thread(scheduler) : Nil thread = Thread.current thread.internal_name = scheduler.name thread.execution_context = self thread.scheduler = scheduler - scheduler.thread = thread - scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do - scheduler.run_loop - end - - thread end # Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop # directly in the thread's main `Fiber`. - private def start_thread(scheduler) : Thread - Thread.new(name: scheduler.name) do |thread| - thread.execution_context = self - thread.scheduler = scheduler - - scheduler.thread = thread - scheduler.main_fiber = thread.main_fiber - scheduler.main_fiber.name = "#{scheduler.name}:loop" - scheduler.run_loop - end + private def start_thread(scheduler) : Nil + ExecutionContext.thread_pool.checkout(scheduler) end # Resizes the context to the new *maximum* parallelism. @@ -203,7 +191,6 @@ module Fiber::ExecutionContext # and eventually assign the copy as @schedulers; this way #steal can # safely access the array (never mutated). new_capacity = maximum - old_threads = @threads old_schedulers = @schedulers old_capacity = capacity @@ -211,9 +198,6 @@ module Fiber::ExecutionContext @schedulers = Array(Scheduler).new(new_capacity) do |index| old_schedulers[index]? || start_scheduler(index) end - threads = Array(Thread).new(new_capacity) - old_threads.each { |thread| threads << thread } - @threads = threads elsif new_capacity < old_capacity # tell the overflow schedulers to shutdown removed_schedulers = old_schedulers[new_capacity..] @@ -221,7 +205,7 @@ module Fiber::ExecutionContext # resize @schedulers = old_schedulers[0...new_capacity] - @threads = old_threads[0...new_capacity] + @started = new_capacity if @started > new_capacity # reset @parked counter (we wake all parked threads) so they can # shutdown (if told to): @@ -294,6 +278,8 @@ module Fiber::ExecutionContext Crystal.trace :sched, "park" @parked.add(1, :acquire_release) + # TODO: detach the scheduler and return the thread back into ThreadPool + # instead @condition.wait(@mutex) # we don't decrement @parked because #wake_scheduler did @@ -355,13 +341,13 @@ module Fiber::ExecutionContext # check if we can start another thread; no need for atomics, the values # shall be rather stable over time and we check them again inside the # mutex - return if @threads.size >= capacity + return if @started >= capacity @mutex.synchronize do - index = @threads.size - return if index >= capacity # check again - - @threads << start_thread(@schedulers[index]) + if (index = @started) < capacity + start_thread(@schedulers[index]) + @started += 1 + end end end diff --git a/src/fiber/execution_context/parallel/scheduler.cr b/src/fiber/execution_context/parallel/scheduler.cr index 89be89ccaa69..6a9d7e34f138 100644 --- a/src/fiber/execution_context/parallel/scheduler.cr +++ b/src/fiber/execution_context/parallel/scheduler.cr @@ -19,7 +19,7 @@ module Fiber::ExecutionContext # :nodoc: property execution_context : Parallel protected property! thread : Thread - protected property! main_fiber : Fiber + protected property main_fiber : Fiber @global_queue : GlobalQueue @runnables : Runnables(256) @@ -35,6 +35,7 @@ module Fiber::ExecutionContext @global_queue = @execution_context.global_queue @runnables = Runnables(256).new(@global_queue) @event_loop = @execution_context.event_loop + @main_fiber = Fiber.new("#{@name}:loop", @execution_context) { run_loop } end protected def shutdown! : Nil diff --git a/src/fiber/execution_context/thread_pool.cr b/src/fiber/execution_context/thread_pool.cr new file mode 100644 index 000000000000..c34ce46cbfba --- /dev/null +++ b/src/fiber/execution_context/thread_pool.cr @@ -0,0 +1,200 @@ +class Fiber + module ExecutionContext + # How long a parked thread will be kept waiting in the thread pool. + # Defaults to 5 minutes. + class_property thread_keepalive : Time::Span = 5.minutes + + # :nodoc: + class ThreadPool + # :nodoc: + struct Parked + include Crystal::PointerLinkedList::Node + + getter thread : Thread + + def initialize(@thread : Thread) + @mutex = Thread::Mutex.new + @condition_variable = Thread::ConditionVariable.new + end + + def synchronize(&) + @mutex.synchronize { yield } + end + + def wake + @condition_variable.signal + end + + def wait + @condition_variable.wait(@mutex) + end + + def wait(timeout, &) + @condition_variable.wait(@mutex, timeout) { yield } + end + + def linked? + !@previous.null? + end + end + + def initialize + @mutex = Thread::Mutex.new + @condition_variable = Thread::ConditionVariable.new + @pool = Crystal::PointerLinkedList(Parked).new + @main_thread = Thread.current + end + + protected def checkout(scheduler) + thread = + if parked = @mutex.synchronize { @pool.shift? } + parked.value.synchronize do + attach(parked.value.thread, scheduler) + parked.value.wake + end + parked.value.thread + else + # OPTIMIZE: start thread with minimum stack size + Thread.new do |thread| + attach(thread, scheduler) + enter_thread_loop(thread) + end + end + Crystal.trace :sched, "thread.checkout", thread: thread + thread + end + + protected def attach(thread, scheduler) : Nil + thread.execution_context = scheduler.execution_context + thread.scheduler = scheduler + scheduler.thread = thread + end + + protected def detach(thread) : Nil + thread.execution_context = nil + thread.scheduler = nil + end + + protected def checkin : Nil + Crystal.trace :sched, "thread.checkin" + thread = Thread.current + detach(thread) + + if thread == @main_thread + resume(main_thread_loop) + else + Thread.name = "" + resume(thread.main_fiber) + end + end + + private def main_thread_loop + @main_thread_loop ||= begin + # OPTIMIZE: allocate minimum stack size + pointer = Crystal::System::Fiber.allocate_stack(StackPool::STACK_SIZE, protect: true) + stack = Stack.new(pointer, StackPool::STACK_SIZE, reusable: true) + Fiber.new(execution_context: ExecutionContext.default) { enter_thread_loop(@main_thread) } + end + end + + # Each thread has a general loop, which is used to park the thread while + # it's in the thread pool. On startup then on wakeup it will resume the + # associated scheduler's main fiber, which itself is running the + # scheduler's run loop. + # + # Upon checkout the thread pool will merely resume the thread's main loop, + # leaving the scheduler's main fiber available for resume by another + # thread if needed, or left dead if the scheduler has shut down (e.g. + # isolated context). + private def enter_thread_loop(thread) + parked = Parked.new(thread) + parked.synchronize do + loop do + if scheduler = thread.scheduler? + unless thread == @main_thread + Thread.name = scheduler.name + end + + resume(scheduler.main_fiber) + + {% unless flag?(:interpreted) %} + if (stack = Thread.current.dead_fiber_stack?) && stack.reusable? + # release pending fiber stack left after swapcontext; we don't + # know which stack pool to return it to, and it may not even + # have one (e.g. isolated fiber stack) + Crystal::System::Fiber.free_stack(stack.pointer, stack.size) + end + {% end %} + end + + @mutex.synchronize do + @pool.push pointerof(parked) + end + + if thread == @main_thread + # never shutdown the main thread: the main fiber is running on its + # original stack, terminating the main thread would invalidate the + # main fiber stack (oops) + parked.wait + else + parked.wait(ExecutionContext.thread_keepalive) do + # reached timeout: try to shutdown thread, but another thread + # might dequeue from @pool in parallel: run checks to avoid any + # race condition: + if !thread.scheduler? && parked.linked? + deleted = false + + @mutex.synchronize do + if parked.linked? + @pool.delete pointerof(parked) + deleted = true + end + end + + if deleted + # no attached scheduler and we removed ourselves from the + # pool: we can safely shutdown (no races) + Crystal.trace :sched, "thread.shutdown" + return + end + + # no attached scheduler but another thread removed ourselves + # from the pool and is waiting to acquire parked.mutex to + # handoff a scheduler: unsync so it can progress + parked.wait + end + end + end + rescue exception + Crystal.trace :sched, "thread.exception", + class: exception.class.name, + message: exception.message + + Crystal.print_error_buffered("BUG: %s#enter_thread_loop crashed", + self.class.name, exception: exception) + end + end + end + + private def resume(fiber) : Nil + Crystal.trace :sched, "thread.resume", fiber: fiber + + # FIXME: duplicates Fiber::ExecutionContext::MultiThreaded::Scheduler#resume: + attempts = 0 + until fiber.resumable? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" if fiber.dead? + attempts = Thread.delay(attempts) + end + + # FIXME: duplicates Fiber::ExecutionContext::Scheduler#swapcontext: + thread = Thread.current + current_fiber = thread.current_fiber + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + end + end + end +end