diff --git a/spec/std/http/server/server_spec.cr b/spec/std/http/server/server_spec.cr index e5aa0ec261a7..f5a82a514e0b 100644 --- a/spec/std/http/server/server_spec.cr +++ b/spec/std/http/server/server_spec.cr @@ -45,12 +45,9 @@ describe HTTP::Server do server = HTTP::Server.new { } server.bind_unused_port - spawn do + run_server(server) do server.close - sleep 0.001 end - - server.listen end it "closes the server" do diff --git a/spec/std/http/spec_helper.cr b/spec/std/http/spec_helper.cr index 410e211e4dbe..18ec9e0bab46 100644 --- a/spec/std/http/spec_helper.cr +++ b/spec/std/http/spec_helper.cr @@ -46,6 +46,11 @@ def run_server(server, &) wait_for { server.listening? } wait_until_blocked f + {% if flag?(:preview_mt) %} + # avoids fiber synchronization issues in specs, like closing the server + # before we properly listen, ... + sleep 0.001 + {% end %} yield server_done ensure server.close unless server.closed? diff --git a/src/concurrent.cr b/src/concurrent.cr index 5b2747d0c581..af2f0aecf736 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -59,10 +59,8 @@ end # ``` def spawn(*, name : String? = nil, same_thread = false, &block) fiber = Fiber.new(name, &block) - if same_thread - fiber.@current_thread.set(Thread.current) - end - Crystal::Scheduler.enqueue fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue fiber end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index 0667626f787e..1a642cb6bef3 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -2,6 +2,7 @@ require "crystal/system/event_loop" require "crystal/system/print_error" require "./fiber_channel" require "fiber" +require "fiber/stack_pool" require "crystal/system/thread" # :nodoc: @@ -13,6 +14,11 @@ require "crystal/system/thread" # protected and must never be called directly. class Crystal::Scheduler @event_loop = Crystal::EventLoop.create + @stack_pool = Fiber::StackPool.new + + def self.stack_pool : Fiber::StackPool + Thread.current.scheduler.@stack_pool + end def self.event_loop Thread.current.scheduler.@event_loop @@ -23,20 +29,21 @@ class Crystal::Scheduler end def self.enqueue(fiber : Fiber) : Nil - {% if flag?(:preview_mt) %} - th = fiber.@current_thread.lazy_get + thread = Thread.current + scheduler = thread.scheduler - if th.nil? - th = Thread.current.scheduler.find_target_thread + {% if flag?(:preview_mt) %} + unless th = fiber.get_current_thread + th = fiber.set_current_thread(scheduler.find_target_thread) end - if th == Thread.current - Thread.current.scheduler.enqueue(fiber) + if th == thread + scheduler.enqueue(fiber) else th.scheduler.send_fiber(fiber) end {% else %} - Thread.current.scheduler.enqueue(fiber) + scheduler.enqueue(fiber) {% end %} end @@ -51,6 +58,7 @@ class Crystal::Scheduler end def self.resume(fiber : Fiber) : Nil + validate_running_thread(fiber) Thread.current.scheduler.resume(fiber) end @@ -59,28 +67,41 @@ class Crystal::Scheduler end def self.yield : Nil - Thread.current.scheduler.yield + # TODO: Fiber switching and libevent for wasm32 + {% unless flag?(:wasm32) %} + Thread.current.scheduler.sleep(0.seconds) + {% end %} end def self.yield(fiber : Fiber) : Nil + validate_running_thread(fiber) Thread.current.scheduler.yield(fiber) end - {% if flag?(:preview_mt) %} - def self.enqueue_free_stack(stack : Void*) : Nil - Thread.current.scheduler.enqueue_free_stack(stack) - end - {% end %} + private def self.validate_running_thread(fiber : Fiber) : Nil + {% if flag?(:preview_mt) %} + if th = fiber.get_current_thread + unless th == Thread.current + raise "BUG: tried to manually resume #{fiber} on #{Thread.current} instead of #{th}" + end + else + fiber.set_current_thread + end + {% end %} + end {% if flag?(:preview_mt) %} private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new } - @free_stacks = Deque(Void*).new {% end %} + + @main : Fiber @lock = Crystal::SpinLock.new @sleeping = false # :nodoc: - def initialize(@main : Fiber) + def initialize(thread : Thread) + @main = thread.main_fiber + {% if flag?(:preview_mt) %} @main.set_current_thread(thread) {% end %} @current = @main @runnables = Deque(Fiber).new end @@ -95,8 +116,8 @@ class Crystal::Scheduler protected def resume(fiber : Fiber) : Nil validate_resumable(fiber) + {% if flag?(:preview_mt) %} - set_current_thread(fiber) GC.lock_read {% elsif flag?(:interpreted) %} # No need to change the stack bottom! @@ -130,43 +151,21 @@ class Crystal::Scheduler end end - private def set_current_thread(fiber) - fiber.@current_thread.set(Thread.current) - end - private def fatal_resume_error(fiber, message) Crystal::System.print_error "\nFATAL: #{message}: #{fiber}\n" caller.each { |line| Crystal::System.print_error " from #{line}\n" } exit 1 end - {% if flag?(:preview_mt) %} - protected def enqueue_free_stack(stack) - @free_stacks.push stack - end - - private def release_free_stacks - while stack = @free_stacks.shift? - Fiber.stack_pool.release stack - end - end - {% end %} - protected def reschedule : Nil loop do if runnable = @lock.sync { @runnables.shift? } - unless runnable == @current - runnable.resume - end + resume(runnable) unless runnable == @current break else @event_loop.run_once end end - - {% if flag?(:preview_mt) %} - release_free_stacks - {% end %} end protected def sleep(time : Time::Span) : Nil @@ -174,13 +173,6 @@ class Crystal::Scheduler reschedule end - protected def yield : Nil - # TODO: Fiber switching and libevent for wasm32 - {% unless flag?(:wasm32) %} - sleep(0.seconds) - {% end %} - end - protected def yield(fiber : Fiber) : Nil @current.resume_event.add(0.seconds) resume(fiber) @@ -191,7 +183,7 @@ class Crystal::Scheduler protected def find_target_thread if workers = @@workers - @rr_target += 1 + @rr_target &+= 1 workers[@rr_target % workers.size] else Thread.current @@ -199,13 +191,16 @@ class Crystal::Scheduler end def run_loop + spawn_stack_pool_collector + fiber_channel = self.fiber_channel loop do @lock.lock + if runnable = @runnables.shift? @runnables << Fiber.current @lock.unlock - runnable.resume + resume(runnable) else @sleeping = true @lock.unlock @@ -215,7 +210,7 @@ class Crystal::Scheduler @sleeping = false @runnables << Fiber.current @lock.unlock - fiber.resume + resume(fiber) end end end @@ -230,12 +225,13 @@ class Crystal::Scheduler @lock.unlock end - def self.init_workers + def self.init : Nil count = worker_count pending = Atomic(Int32).new(count - 1) @@workers = Array(Thread).new(count) do |i| if i == 0 worker_loop = Fiber.new(name: "Worker Loop") { Thread.current.scheduler.run_loop } + worker_loop.set_current_thread Thread.current.scheduler.enqueue worker_loop Thread.current else @@ -271,5 +267,18 @@ class Crystal::Scheduler 4 end end + {% else %} + def self.init : Nil + {% unless flag?(:interpreted) %} + Thread.current.scheduler.spawn_stack_pool_collector + {% end %} + end {% end %} + + # Background loop to cleanup unused fiber stacks. + def spawn_stack_pool_collector + fiber = Fiber.new(name: "Stack pool collector", &->@stack_pool.collect_loop) + {% if flag?(:preview_mt) %} fiber.set_current_thread {% end %} + enqueue(fiber) + end end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 878be639a7a3..88784ed68330 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -105,7 +105,7 @@ class Thread end # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(main_fiber) } + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } protected def start Thread.threads.push(self) diff --git a/src/fiber.cr b/src/fiber.cr index f89489c2bd13..c96184f3cf1f 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,6 +1,5 @@ require "crystal/system/thread_linked_list" require "./fiber/context" -require "./fiber/stack_pool" # :nodoc: @[NoInline] @@ -47,9 +46,6 @@ class Fiber # :nodoc: protected class_getter(fibers) { Thread::LinkedList(Fiber).new } - # :nodoc: - class_getter stack_pool = StackPool.new - @context : Context @stack : Void* @resume_event : Crystal::EventLoop::Event? @@ -62,7 +58,7 @@ class Fiber property name : String? @alive = true - @current_thread = Atomic(Thread?).new(nil) + {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} # :nodoc: property next : Fiber? @@ -89,10 +85,9 @@ class Fiber @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} - # For interpreted mode we don't need a new stack, the stack is held by the interpreter {Pointer(Void).null, Pointer(Void).null} {% else %} - Fiber.stack_pool.checkout + Crystal::Scheduler.stack_pool.checkout {% end %} fiber_main = ->(f : Fiber) { f.run } @@ -136,7 +131,7 @@ class Fiber {% end %} thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom @name = "main" - @current_thread.set(thread) + {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} Fiber.fibers.push(self) end @@ -153,14 +148,6 @@ class Fiber ex.inspect_with_backtrace(STDERR) STDERR.flush ensure - {% if flag?(:preview_mt) %} - Crystal::Scheduler.enqueue_free_stack @stack - {% elsif flag?(:interpreted) %} - # For interpreted mode we don't need a new stack, the stack is held by the interpreter - {% else %} - Fiber.stack_pool.release(@stack) - {% end %} - # Remove the current fiber from the linked list Fiber.inactive(self) @@ -170,6 +157,9 @@ class Fiber @timeout_select_action = nil @alive = false + {% unless flag?(:interpreted) %} + Crystal::Scheduler.stack_pool.release(@stack) + {% end %} Crystal::Scheduler.reschedule end @@ -305,4 +295,16 @@ class Fiber # Push the used section of the stack GC.push_stack @context.stack_top, @stack_bottom end + + {% if flag?(:preview_mt) %} + # :nodoc: + def set_current_thread(thread = Thread.current) : Thread + @current_thread.set(thread) + end + + # :nodoc: + def get_current_thread : Thread? + @current_thread.lazy_get + end + {% end %} end diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 54d03e4ffa5f..aebd82a0870f 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -7,14 +7,13 @@ class Fiber def initialize @deque = Deque(Void*).new - @mutex = Thread::Mutex.new end # Removes and frees at most *count* stacks from the top of the pool, # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @mutex.synchronize { @deque.shift? } + if stack = @deque.shift? Crystal::System::Fiber.free_stack(stack, STACK_SIZE) else return @@ -22,21 +21,28 @@ class Fiber end end + def collect_loop(every = 5.seconds) : Nil + loop do + sleep every + collect + end + end + # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : {Void*, Void*} - stack = @mutex.synchronize { @deque.pop? } || Crystal::System::Fiber.allocate_stack(STACK_SIZE) + stack = @deque.pop? || Crystal::System::Fiber.allocate_stack(STACK_SIZE) {stack, stack + STACK_SIZE} end # Appends a stack to the bottom of the pool. def release(stack) : Nil - @mutex.synchronize { @deque.push(stack) } + @deque.push(stack) end # Returns the approximated size of the pool. It may be equal or slightly # bigger or smaller than the actual size. def lazy_size : Int32 - @mutex.synchronize { @deque.size } + @deque.size end end end diff --git a/src/kernel.cr b/src/kernel.cr index c3b3106ccae3..d3817ee11661 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -563,14 +563,6 @@ end {% end %} {% unless flag?(:interpreted) || flag?(:wasm32) %} - # Background loop to cleanup unused fiber stacks. - spawn(name: "Fiber Clean Loop") do - loop do - sleep 5 - Fiber.stack_pool.collect - end - end - {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop {% else %} @@ -586,7 +578,5 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - {% if flag?(:preview_mt) %} - Crystal::Scheduler.init_workers - {% end %} + Crystal::Scheduler.init {% end %}