Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ end
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
if same_thread
fiber.@current_thread.set(Thread.current)
end
Crystal::Scheduler.enqueue fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
end

Expand Down
27 changes: 21 additions & 6 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class Crystal::Scheduler
end

def self.resume(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.resume(fiber)
end

Expand All @@ -63,9 +64,22 @@ class Crystal::Scheduler
end

def self.yield(fiber : Fiber) : Nil
validate_running_thread(fiber)
Thread.current.scheduler.yield(fiber)
end

private def self.validate_running_thread(fiber : Fiber) : Nil
{% if flag?(:preview_mt) %}
if th = fiber.get_current_thread
unless th == Thread.current
raise "BUG: tried to manually resume #{fiber} on #{Thread.current} instead of #{th}"
end
else
fiber.set_current_thread
end
{% end %}
end

{% if flag?(:preview_mt) %}
def self.enqueue_free_stack(stack : Void*) : Nil
Thread.current.scheduler.enqueue_free_stack(stack)
Expand All @@ -76,11 +90,15 @@ class Crystal::Scheduler
private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new }
@free_stacks = Deque(Void*).new
{% end %}

@main : Fiber
@lock = Crystal::SpinLock.new
@sleeping = false

# :nodoc:
def initialize(@main : Fiber)
def initialize(thread : Thread)
@main = thread.main_fiber
{% if flag?(:preview_mt) %} @main.set_current_thread(thread) {% end %}
@current = @main
@runnables = Deque(Fiber).new
end
Expand All @@ -95,8 +113,8 @@ class Crystal::Scheduler

protected def resume(fiber : Fiber) : Nil
validate_resumable(fiber)

{% if flag?(:preview_mt) %}
set_current_thread(fiber)
GC.lock_read
{% elsif flag?(:interpreted) %}
# No need to change the stack bottom!
Expand Down Expand Up @@ -130,10 +148,6 @@ class Crystal::Scheduler
end
end

private def set_current_thread(fiber)
fiber.@current_thread.set(Thread.current)
end

private def fatal_resume_error(fiber, message)
Crystal::System.print_error "\nFATAL: #{message}: #{fiber}\n"
caller.each { |line| Crystal::System.print_error " from #{line}\n" }
Expand Down Expand Up @@ -236,6 +250,7 @@ class Crystal::Scheduler
@@workers = Array(Thread).new(count) do |i|
if i == 0
worker_loop = Fiber.new(name: "Worker Loop") { Thread.current.scheduler.run_loop }
worker_loop.set_current_thread
Thread.current.scheduler.enqueue worker_loop
Thread.current
else
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Thread
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(main_fiber) }
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

protected def start
Thread.threads.push(self)
Expand Down
16 changes: 14 additions & 2 deletions src/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Fiber
property name : String?

@alive = true
@current_thread = Atomic(Thread?).new(nil)
{% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %}

# :nodoc:
property next : Fiber?
Expand Down Expand Up @@ -136,7 +136,7 @@ class Fiber
{% end %}
thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom
@name = "main"
@current_thread.set(thread)
{% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %}
Fiber.fibers.push(self)
end

Expand Down Expand Up @@ -305,4 +305,16 @@ class Fiber
# Push the used section of the stack
GC.push_stack @context.stack_top, @stack_bottom
end

{% if flag?(:preview_mt) %}
# :nodoc:
def set_current_thread(thread = Thread.current) : Thread
@current_thread.set(thread)
end

# :nodoc:
def get_current_thread : Thread?
@current_thread.lazy_get
end
{% end %}
end