From 65ff4853434fbb9ff0d80af5cb2a37820b1e7a01 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 26 May 2025 16:11:02 +0200 Subject: [PATCH 1/2] Add Fiber::ExecutionContext::ThreadPool A global pool of thread to start new threads from, and return threads to, so we don't start and stop threads all the time, and can wake an existing thread instead of creating a new one from scratch. The thread pool still eventually shuts down a thread after a configurable keepalive is reached, but takes extra measures to never shutdown the main thread, which would invalide the program's main fiber stack (segfaults). --- src/crystal/tracing.cr | 8 + src/fiber/execution_context.cr | 7 + src/fiber/execution_context/thread_pool.cr | 200 +++++++++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 src/fiber/execution_context/thread_pool.cr diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index 912a4bad171b..918d262b9a03 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -82,6 +82,14 @@ module Crystal write value.name || '?' end + def write(value : Thread) : Nil + {% if flag?(:linux) %} + write Pointer(Void).new(value.@system_handle) + {% else %} + write value.@system_handle + {% end %} + end + {% if flag?(:execution_context) %} def write(value : Fiber::ExecutionContext) : Nil write value.name diff --git a/src/fiber/execution_context.cr b/src/fiber/execution_context.cr index 8e18ac4bd018..795a07ae22ec 100644 --- a/src/fiber/execution_context.cr +++ b/src/fiber/execution_context.cr @@ -69,8 +69,14 @@ require "./execution_context/*" # ``` @[Experimental] module Fiber::ExecutionContext + @@thread_pool : ThreadPool? @@default : ExecutionContext::Parallel? + # :nodoc: + def self.thread_pool : ThreadPool + @@thread_pool.not_nil!("expected thread pool to have been setup") + end + # Returns the default `ExecutionContext` for the process, automatically # started when the program started. # @@ -84,6 +90,7 @@ module Fiber::ExecutionContext # :nodoc: def self.init_default_context : Nil + @@thread_pool = ThreadPool.new @@default = Parallel.default(1) @@monitor = Monitor.new end diff --git a/src/fiber/execution_context/thread_pool.cr b/src/fiber/execution_context/thread_pool.cr new file mode 100644 index 000000000000..c34ce46cbfba --- /dev/null +++ b/src/fiber/execution_context/thread_pool.cr @@ -0,0 +1,200 @@ +class Fiber + module ExecutionContext + # How long a parked thread will be kept waiting in the thread pool. + # Defaults to 5 minutes. + class_property thread_keepalive : Time::Span = 5.minutes + + # :nodoc: + class ThreadPool + # :nodoc: + struct Parked + include Crystal::PointerLinkedList::Node + + getter thread : Thread + + def initialize(@thread : Thread) + @mutex = Thread::Mutex.new + @condition_variable = Thread::ConditionVariable.new + end + + def synchronize(&) + @mutex.synchronize { yield } + end + + def wake + @condition_variable.signal + end + + def wait + @condition_variable.wait(@mutex) + end + + def wait(timeout, &) + @condition_variable.wait(@mutex, timeout) { yield } + end + + def linked? + !@previous.null? + end + end + + def initialize + @mutex = Thread::Mutex.new + @condition_variable = Thread::ConditionVariable.new + @pool = Crystal::PointerLinkedList(Parked).new + @main_thread = Thread.current + end + + protected def checkout(scheduler) + thread = + if parked = @mutex.synchronize { @pool.shift? } + parked.value.synchronize do + attach(parked.value.thread, scheduler) + parked.value.wake + end + parked.value.thread + else + # OPTIMIZE: start thread with minimum stack size + Thread.new do |thread| + attach(thread, scheduler) + enter_thread_loop(thread) + end + end + Crystal.trace :sched, "thread.checkout", thread: thread + thread + end + + protected def attach(thread, scheduler) : Nil + thread.execution_context = scheduler.execution_context + thread.scheduler = scheduler + scheduler.thread = thread + end + + protected def detach(thread) : Nil + thread.execution_context = nil + thread.scheduler = nil + end + + protected def checkin : Nil + Crystal.trace :sched, "thread.checkin" + thread = Thread.current + detach(thread) + + if thread == @main_thread + resume(main_thread_loop) + else + Thread.name = "" + resume(thread.main_fiber) + end + end + + private def main_thread_loop + @main_thread_loop ||= begin + # OPTIMIZE: allocate minimum stack size + pointer = Crystal::System::Fiber.allocate_stack(StackPool::STACK_SIZE, protect: true) + stack = Stack.new(pointer, StackPool::STACK_SIZE, reusable: true) + Fiber.new(execution_context: ExecutionContext.default) { enter_thread_loop(@main_thread) } + end + end + + # Each thread has a general loop, which is used to park the thread while + # it's in the thread pool. On startup then on wakeup it will resume the + # associated scheduler's main fiber, which itself is running the + # scheduler's run loop. + # + # Upon checkout the thread pool will merely resume the thread's main loop, + # leaving the scheduler's main fiber available for resume by another + # thread if needed, or left dead if the scheduler has shut down (e.g. + # isolated context). + private def enter_thread_loop(thread) + parked = Parked.new(thread) + parked.synchronize do + loop do + if scheduler = thread.scheduler? + unless thread == @main_thread + Thread.name = scheduler.name + end + + resume(scheduler.main_fiber) + + {% unless flag?(:interpreted) %} + if (stack = Thread.current.dead_fiber_stack?) && stack.reusable? + # release pending fiber stack left after swapcontext; we don't + # know which stack pool to return it to, and it may not even + # have one (e.g. isolated fiber stack) + Crystal::System::Fiber.free_stack(stack.pointer, stack.size) + end + {% end %} + end + + @mutex.synchronize do + @pool.push pointerof(parked) + end + + if thread == @main_thread + # never shutdown the main thread: the main fiber is running on its + # original stack, terminating the main thread would invalidate the + # main fiber stack (oops) + parked.wait + else + parked.wait(ExecutionContext.thread_keepalive) do + # reached timeout: try to shutdown thread, but another thread + # might dequeue from @pool in parallel: run checks to avoid any + # race condition: + if !thread.scheduler? && parked.linked? + deleted = false + + @mutex.synchronize do + if parked.linked? + @pool.delete pointerof(parked) + deleted = true + end + end + + if deleted + # no attached scheduler and we removed ourselves from the + # pool: we can safely shutdown (no races) + Crystal.trace :sched, "thread.shutdown" + return + end + + # no attached scheduler but another thread removed ourselves + # from the pool and is waiting to acquire parked.mutex to + # handoff a scheduler: unsync so it can progress + parked.wait + end + end + end + rescue exception + Crystal.trace :sched, "thread.exception", + class: exception.class.name, + message: exception.message + + Crystal.print_error_buffered("BUG: %s#enter_thread_loop crashed", + self.class.name, exception: exception) + end + end + end + + private def resume(fiber) : Nil + Crystal.trace :sched, "thread.resume", fiber: fiber + + # FIXME: duplicates Fiber::ExecutionContext::MultiThreaded::Scheduler#resume: + attempts = 0 + until fiber.resumable? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" if fiber.dead? + attempts = Thread.delay(attempts) + end + + # FIXME: duplicates Fiber::ExecutionContext::Scheduler#swapcontext: + thread = Thread.current + current_fiber = thread.current_fiber + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + end + end + end +end From 5370dd5c006ee43926f98c2cf317a837940982dc Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 2 Jun 2025 20:43:53 +0200 Subject: [PATCH 2/2] Execution contexts now take threads from thread pool --- src/crystal/system/thread.cr | 9 ++- src/fiber/execution_context/isolated.cr | 27 +++++--- src/fiber/execution_context/parallel.cr | 66 ++++++++----------- .../execution_context/parallel/scheduler.cr | 3 +- 4 files changed, 52 insertions(+), 53 deletions(-) diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 8272b3a84f38..06f8af461ba8 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -84,11 +84,14 @@ class Thread getter! execution_context : Fiber::ExecutionContext # :nodoc: - property! scheduler : Fiber::ExecutionContext::Scheduler + def execution_context=(@execution_context : Fiber::ExecutionContext?) + end + + # :nodoc: + getter! scheduler : Fiber::ExecutionContext::Scheduler # :nodoc: - def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext - main_fiber.execution_context = execution_context + def scheduler=(@scheduler : Fiber::ExecutionContext::Scheduler?) end # When a fiber terminates we can't release its stack until we swap context diff --git a/src/fiber/execution_context/isolated.cr b/src/fiber/execution_context/isolated.cr index 6dacf1966b02..96a4d8fcdf57 100644 --- a/src/fiber/execution_context/isolated.cr +++ b/src/fiber/execution_context/isolated.cr @@ -45,7 +45,7 @@ module Fiber::ExecutionContext @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new protected getter thread : Thread - @main_fiber : Fiber + protected getter main_fiber : Fiber # :nodoc: getter(event_loop : Crystal::EventLoop) do @@ -66,19 +66,23 @@ module Fiber::ExecutionContext def initialize(@name : String, @spawn_context : ExecutionContext = ExecutionContext.default, &@func : ->) @thread = uninitialized Thread @main_fiber = uninitialized Fiber + @main_fiber = Fiber.new(@name, allocate_stack, self) { run } @thread = start_thread ExecutionContext.execution_contexts.push(self) end + private def allocate_stack : Stack + # no stack pool: we directly allocate a stack; it will be automatically + # released when the thread is returned to the thread pool + pointer = Crystal::System::Fiber.allocate_stack(StackPool::STACK_SIZE, protect: true) + Stack.new(pointer, StackPool::STACK_SIZE, reusable: true) + end + private def start_thread : Thread - Thread.new(name: @name) do |thread| - @thread = thread - thread.execution_context = self - thread.scheduler = self - thread.main_fiber.name = @name - @main_fiber = thread.main_fiber - run - end + ExecutionContext.thread_pool.checkout(self) + end + + protected def thread=(@thread) end # :nodoc: @@ -136,6 +140,11 @@ module Fiber::ExecutionContext protected def reschedule : Nil Crystal.trace :sched, "reschedule" + if @main_fiber.dead? + ExecutionContext.thread_pool.checkin + return # actually unreachable + end + if event_loop = @event_loop wait_for(event_loop) else diff --git a/src/fiber/execution_context/parallel.cr b/src/fiber/execution_context/parallel.cr index 51f50a37fd13..56ed82665363 100644 --- a/src/fiber/execution_context/parallel.cr +++ b/src/fiber/execution_context/parallel.cr @@ -75,8 +75,12 @@ module Fiber::ExecutionContext @spinning = Atomic(Int32).new(0) # :nodoc: + # + # Starts the default execution context. There can be only one for the whole + # process. Must be called from the main thread's main fiber; associates the + # current thread and fiber to the created execution context. protected def self.default(maximum : Int32) : self - new("DEFAULT", maximum, hijack: true) + Fiber.current.execution_context = new("DEFAULT", maximum, hijack: true) end # Starts a `Parallel` context with a *maximum* parallelism. The context @@ -98,28 +102,27 @@ module Fiber::ExecutionContext new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end - protected def initialize(@name : String, @capacity : Int32, hijack : Bool) - raise ArgumentError.new("Parallelism can't be less than one.") if @capacity < 1 + protected def initialize(@name : String, capacity : Int32, hijack : Bool) + raise ArgumentError.new("Parallelism can't be less than one.") if capacity < 1 @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new @global_queue = GlobalQueue.new(@mutex) @schedulers = Array(Scheduler).new(capacity) - @threads = Array(Thread).new(capacity) @event_loop = Crystal::EventLoop.create(capacity) - + @started = hijack ? 1 : 0 @rng = Random::PCG32.new start_schedulers(capacity) - @threads << hijack_current_thread(@schedulers.first) if hijack + hijack_current_thread(@schedulers.first) if hijack ExecutionContext.execution_contexts.push(self) end # The number of threads that have been started. def size : Int32 - @threads.size + @started end # The maximum number of threads that can be started. @@ -134,12 +137,11 @@ module Fiber::ExecutionContext # Starts all schedulers at once. # - # We could lazily initialize them as needed, like we do for threads, which - # would be safe as long as we only mutate when the mutex is locked... but - # unlike @threads, we do iterate the schedulers in #steal without locking - # the mutex (for obvious reasons) and there are no guarantees that the new - # schedulers.@size will be written after the scheduler has been written to - # the array's buffer. + # We could lazily initialize them as needed, would be safe as long as we + # only mutate when the mutex is locked, but we iterate the schedulers in + # #steal without locking the mutex (for obvious reasons) and there are no + # guarantees that the new schedulers.@size will be written after the + # scheduler has been written to the array's buffer. # # OPTIMIZE: consider storing schedulers to an array-like object that would # use an atomic/fence to make sure that @size can only be incremented @@ -156,32 +158,18 @@ module Fiber::ExecutionContext # Attaches *scheduler* to the current `Thread`, usually the process' main # thread. Starts a `Fiber` to run the scheduler loop. - private def hijack_current_thread(scheduler) : Thread + private def hijack_current_thread(scheduler) : Nil thread = Thread.current thread.internal_name = scheduler.name thread.execution_context = self thread.scheduler = scheduler - scheduler.thread = thread - scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do - scheduler.run_loop - end - - thread end # Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop # directly in the thread's main `Fiber`. - private def start_thread(scheduler) : Thread - Thread.new(name: scheduler.name) do |thread| - thread.execution_context = self - thread.scheduler = scheduler - - scheduler.thread = thread - scheduler.main_fiber = thread.main_fiber - scheduler.main_fiber.name = "#{scheduler.name}:loop" - scheduler.run_loop - end + private def start_thread(scheduler) : Nil + ExecutionContext.thread_pool.checkout(scheduler) end # Resizes the context to the new *maximum* parallelism. @@ -203,7 +191,6 @@ module Fiber::ExecutionContext # and eventually assign the copy as @schedulers; this way #steal can # safely access the array (never mutated). new_capacity = maximum - old_threads = @threads old_schedulers = @schedulers old_capacity = capacity @@ -211,9 +198,6 @@ module Fiber::ExecutionContext @schedulers = Array(Scheduler).new(new_capacity) do |index| old_schedulers[index]? || start_scheduler(index) end - threads = Array(Thread).new(new_capacity) - old_threads.each { |thread| threads << thread } - @threads = threads elsif new_capacity < old_capacity # tell the overflow schedulers to shutdown removed_schedulers = old_schedulers[new_capacity..] @@ -221,7 +205,7 @@ module Fiber::ExecutionContext # resize @schedulers = old_schedulers[0...new_capacity] - @threads = old_threads[0...new_capacity] + @started = new_capacity if @started > new_capacity # reset @parked counter (we wake all parked threads) so they can # shutdown (if told to): @@ -294,6 +278,8 @@ module Fiber::ExecutionContext Crystal.trace :sched, "park" @parked.add(1, :acquire_release) + # TODO: detach the scheduler and return the thread back into ThreadPool + # instead @condition.wait(@mutex) # we don't decrement @parked because #wake_scheduler did @@ -355,13 +341,13 @@ module Fiber::ExecutionContext # check if we can start another thread; no need for atomics, the values # shall be rather stable over time and we check them again inside the # mutex - return if @threads.size >= capacity + return if @started >= capacity @mutex.synchronize do - index = @threads.size - return if index >= capacity # check again - - @threads << start_thread(@schedulers[index]) + if (index = @started) < capacity + start_thread(@schedulers[index]) + @started += 1 + end end end diff --git a/src/fiber/execution_context/parallel/scheduler.cr b/src/fiber/execution_context/parallel/scheduler.cr index 89be89ccaa69..6a9d7e34f138 100644 --- a/src/fiber/execution_context/parallel/scheduler.cr +++ b/src/fiber/execution_context/parallel/scheduler.cr @@ -19,7 +19,7 @@ module Fiber::ExecutionContext # :nodoc: property execution_context : Parallel protected property! thread : Thread - protected property! main_fiber : Fiber + protected property main_fiber : Fiber @global_queue : GlobalQueue @runnables : Runnables(256) @@ -35,6 +35,7 @@ module Fiber::ExecutionContext @global_queue = @execution_context.global_queue @runnables = Runnables(256).new(@global_queue) @event_loop = @execution_context.event_loop + @main_fiber = Fiber.new("#{@name}:loop", @execution_context) { run_loop } end protected def shutdown! : Nil