Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions spec/std/http/server/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ describe HTTP::Server do
server = HTTP::Server.new { }
server.bind_unused_port

spawn do
run_server(server) do
server.close
sleep 0.001
end

server.listen
end

it "closes the server" do
Expand Down
5 changes: 5 additions & 0 deletions spec/std/http/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def run_server(server, &)
wait_for { server.listening? }
wait_until_blocked f

{% if flag?(:preview_mt) %}
# avoids fiber synchronization issues in specs, like closing the server
# before we properly listen, ...
sleep 0.001
{% end %}
yield server_done
ensure
server.close unless server.closed?
Expand Down
6 changes: 2 additions & 4 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ end
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
if same_thread
fiber.@current_thread.set(Thread.current)
end
Crystal::Scheduler.enqueue fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
end

Expand Down
109 changes: 59 additions & 50 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "crystal/system/event_loop"
require "crystal/system/print_error"
require "./fiber_channel"
require "fiber"
require "fiber/stack_pool"
require "crystal/system/thread"

# :nodoc:
Expand All @@ -13,6 +14,11 @@ require "crystal/system/thread"
# protected and must never be called directly.
class Crystal::Scheduler
@event_loop = Crystal::EventLoop.create
@stack_pool = Fiber::StackPool.new

def self.stack_pool : Fiber::StackPool
Thread.current.scheduler.@stack_pool
end

def self.event_loop
Thread.current.scheduler.@event_loop
Expand All @@ -23,20 +29,21 @@ class Crystal::Scheduler
end

def self.enqueue(fiber : Fiber) : Nil
{% if flag?(:preview_mt) %}
th = fiber.@current_thread.lazy_get
thread = Thread.current
scheduler = thread.scheduler

if th.nil?
th = Thread.current.scheduler.find_target_thread
{% if flag?(:preview_mt) %}
unless th = fiber.get_current_thread
th = fiber.set_current_thread(scheduler.find_target_thread)
end

if th == Thread.current
Thread.current.scheduler.enqueue(fiber)
if th == thread
scheduler.enqueue(fiber)
else
th.scheduler.send_fiber(fiber)
end
{% else %}
Thread.current.scheduler.enqueue(fiber)
scheduler.enqueue(fiber)
{% end %}
end

Expand All @@ -51,6 +58,7 @@ class Crystal::Scheduler
end

def self.resume(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.resume(fiber)
end

Expand All @@ -59,28 +67,41 @@ class Crystal::Scheduler
end

def self.yield : Nil
Thread.current.scheduler.yield
# TODO: Fiber switching and libevent for wasm32
{% unless flag?(:wasm32) %}
Thread.current.scheduler.sleep(0.seconds)
{% end %}
end

def self.yield(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.yield(fiber)
end

{% if flag?(:preview_mt) %}
def self.enqueue_free_stack(stack : Void*) : Nil
Thread.current.scheduler.enqueue_free_stack(stack)
end
{% end %}
private def self.validate_running_thread(fiber : Fiber) : Nil
{% if flag?(:preview_mt) %}
if th = fiber.get_current_thread
unless th == Thread.current
raise "BUG: tried to manually resume #{fiber} on #{Thread.current} instead of #{th}"
end
else
fiber.set_current_thread
end
{% end %}
end

{% if flag?(:preview_mt) %}
private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new }
@free_stacks = Deque(Void*).new
{% end %}

@main : Fiber
@lock = Crystal::SpinLock.new
@sleeping = false

# :nodoc:
def initialize(@main : Fiber)
def initialize(thread : Thread)
@main = thread.main_fiber
{% if flag?(:preview_mt) %} @main.set_current_thread(thread) {% end %}
@current = @main
@runnables = Deque(Fiber).new
end
Expand All @@ -95,8 +116,8 @@ class Crystal::Scheduler

protected def resume(fiber : Fiber) : Nil
validate_resumable(fiber)

{% if flag?(:preview_mt) %}
set_current_thread(fiber)
GC.lock_read
{% elsif flag?(:interpreted) %}
# No need to change the stack bottom!
Expand Down Expand Up @@ -130,57 +151,28 @@ class Crystal::Scheduler
end
end

private def set_current_thread(fiber)
fiber.@current_thread.set(Thread.current)
end

private def fatal_resume_error(fiber, message)
Crystal::System.print_error "\nFATAL: #{message}: #{fiber}\n"
caller.each { |line| Crystal::System.print_error " from #{line}\n" }
exit 1
end

{% if flag?(:preview_mt) %}
protected def enqueue_free_stack(stack)
@free_stacks.push stack
end

private def release_free_stacks
while stack = @free_stacks.shift?
Fiber.stack_pool.release stack
end
end
{% end %}

protected def reschedule : Nil
loop do
if runnable = @lock.sync { @runnables.shift? }
unless runnable == @current
runnable.resume
end
resume(runnable) unless runnable == @current
break
else
@event_loop.run_once
end
end

{% if flag?(:preview_mt) %}
release_free_stacks
{% end %}
end

protected def sleep(time : Time::Span) : Nil
@current.resume_event.add(time)
reschedule
end

protected def yield : Nil
# TODO: Fiber switching and libevent for wasm32
{% unless flag?(:wasm32) %}
sleep(0.seconds)
{% end %}
end

protected def yield(fiber : Fiber) : Nil
@current.resume_event.add(0.seconds)
resume(fiber)
Expand All @@ -191,21 +183,24 @@ class Crystal::Scheduler

protected def find_target_thread
if workers = @@workers
@rr_target += 1
@rr_target &+= 1
workers[@rr_target % workers.size]
else
Thread.current
end
end

def run_loop
spawn_stack_pool_collector

fiber_channel = self.fiber_channel
loop do
@lock.lock

if runnable = @runnables.shift?
@runnables << Fiber.current
@lock.unlock
runnable.resume
resume(runnable)
else
@sleeping = true
@lock.unlock
Expand All @@ -215,7 +210,7 @@ class Crystal::Scheduler
@sleeping = false
@runnables << Fiber.current
@lock.unlock
fiber.resume
resume(fiber)
end
end
end
Expand All @@ -230,12 +225,13 @@ class Crystal::Scheduler
@lock.unlock
end

def self.init_workers
def self.init : Nil
count = worker_count
pending = Atomic(Int32).new(count - 1)
@@workers = Array(Thread).new(count) do |i|
if i == 0
worker_loop = Fiber.new(name: "Worker Loop") { Thread.current.scheduler.run_loop }
worker_loop.set_current_thread
Thread.current.scheduler.enqueue worker_loop
Thread.current
else
Expand Down Expand Up @@ -271,5 +267,18 @@ class Crystal::Scheduler
4
end
end
{% else %}
def self.init : Nil
{% unless flag?(:interpreted) %}
Thread.current.scheduler.spawn_stack_pool_collector
{% end %}
end
{% end %}

# Background loop to cleanup unused fiber stacks.
def spawn_stack_pool_collector
fiber = Fiber.new(name: "Stack pool collector", &->@stack_pool.collect_loop)
{% if flag?(:preview_mt) %} fiber.set_current_thread {% end %}
enqueue(fiber)
end
end
2 changes: 1 addition & 1 deletion src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Thread
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(main_fiber) }
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

protected def start
Thread.threads.push(self)
Expand Down
34 changes: 18 additions & 16 deletions src/fiber.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "crystal/system/thread_linked_list"
require "./fiber/context"
require "./fiber/stack_pool"

# :nodoc:
@[NoInline]
Expand Down Expand Up @@ -47,9 +46,6 @@ class Fiber
# :nodoc:
protected class_getter(fibers) { Thread::LinkedList(Fiber).new }

# :nodoc:
class_getter stack_pool = StackPool.new

@context : Context
@stack : Void*
@resume_event : Crystal::EventLoop::Event?
Expand All @@ -62,7 +58,7 @@ class Fiber
property name : String?

@alive = true
@current_thread = Atomic(Thread?).new(nil)
{% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %}

# :nodoc:
property next : Fiber?
Expand All @@ -89,10 +85,9 @@ class Fiber
@context = Context.new
@stack, @stack_bottom =
{% if flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{Pointer(Void).null, Pointer(Void).null}
{% else %}
Fiber.stack_pool.checkout
Crystal::Scheduler.stack_pool.checkout
{% end %}

fiber_main = ->(f : Fiber) { f.run }
Expand Down Expand Up @@ -136,7 +131,7 @@ class Fiber
{% end %}
thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom
@name = "main"
@current_thread.set(thread)
{% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %}
Fiber.fibers.push(self)
end

Expand All @@ -153,14 +148,6 @@ class Fiber
ex.inspect_with_backtrace(STDERR)
STDERR.flush
ensure
{% if flag?(:preview_mt) %}
Crystal::Scheduler.enqueue_free_stack @stack
{% elsif flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{% else %}
Fiber.stack_pool.release(@stack)
{% end %}

# Remove the current fiber from the linked list
Fiber.inactive(self)

Expand All @@ -170,6 +157,9 @@ class Fiber
@timeout_select_action = nil

@alive = false
{% unless flag?(:interpreted) %}
Crystal::Scheduler.stack_pool.release(@stack)
{% end %}
Crystal::Scheduler.reschedule
end

Expand Down Expand Up @@ -305,4 +295,16 @@ class Fiber
# Push the used section of the stack
GC.push_stack @context.stack_top, @stack_bottom
end

{% if flag?(:preview_mt) %}
# :nodoc:
def set_current_thread(thread = Thread.current) : Thread
@current_thread.set(thread)
end

# :nodoc:
def get_current_thread : Thread?
@current_thread.lazy_get
end
{% end %}
end
Loading