diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr index 17b746c7dc86..b6b866b79e88 100644 --- a/spec/std/fiber/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../../../support/thread" describe Fiber::ExecutionContext::GlobalQueue do it "#initialize" do @@ -98,10 +99,9 @@ describe Fiber::ExecutionContext::GlobalQueue do increments = 15 queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) - n.times do |i| - Thread.new("ONE-#{i}") do |thread| + threads = Array(Thread).new(n) do |i| + new_thread("ONE-#{i}") do slept = 0 ready.done @@ -117,10 +117,6 @@ describe Fiber::ExecutionContext::GlobalQueue do break end end - rescue exception - Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" - ensure - shutdown.done end end ready.wait @@ -130,7 +126,7 @@ describe Fiber::ExecutionContext::GlobalQueue do Thread.sleep(10.nanoseconds) if i % 10 == 9 end - shutdown.wait + threads.each(&.join) # must have dequeued each fiber exactly X times fibers.each { |fc| fc.counter.should eq(increments) } @@ -146,10 +142,9 @@ describe Fiber::ExecutionContext::GlobalQueue do queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) - n.times do |i| - Thread.new("BULK-#{i}") do |thread| + threads = Array(Thread).new(n) do |i| + new_thread("BULK-#{i}") do slept = 0 r = Fiber::ExecutionContext::Runnables(3).new(queue) @@ -200,10 +195,6 @@ describe Fiber::ExecutionContext::GlobalQueue do slept += 1 Thread.sleep(1.nanosecond) # don't burn CPU end - rescue exception - Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" - ensure - shutdown.done end end ready.wait @@ -216,7 +207,7 @@ describe Fiber::ExecutionContext::GlobalQueue do Thread.sleep(10.nanoseconds) if i % 4 == 3 end - shutdown.wait + threads.each(&.join) # must have dequeued each fiber exactly X times (no less, no more) fibers.each { |fc| fc.counter.should eq(increments) } diff --git a/spec/std/fiber/execution_context/runnables_spec.cr b/spec/std/fiber/execution_context/runnables_spec.cr index 4c4a227e374f..0df82f3f5bda 100644 --- a/spec/std/fiber/execution_context/runnables_spec.cr +++ b/spec/std/fiber/execution_context/runnables_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../../../support/thread" describe Fiber::ExecutionContext::Runnables do it "#initialize" do @@ -190,14 +191,13 @@ describe Fiber::ExecutionContext::Runnables do global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) all_runnables = Array(Fiber::ExecutionContext::Runnables(16)).new(n) do Fiber::ExecutionContext::Runnables(16).new(global_queue) end - n.times do |i| - Thread.new("RUN-#{i}") do |thread| + threads = Array(Thread).new(n) do |i| + new_thread("RUN-#{i}") do runnables = all_runnables[i] slept = 0 @@ -239,10 +239,6 @@ describe Fiber::ExecutionContext::Runnables do slept += 1 Thread.sleep(1.nanosecond) # don't burn CPU end - rescue exception - Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" - ensure - shutdown.done end end ready.wait @@ -255,7 +251,7 @@ describe Fiber::ExecutionContext::Runnables do Thread.sleep(10.nanoseconds) if i % 2 == 1 end - shutdown.wait + threads.map(&.join) # must have dequeued each fiber exactly X times (no less, no more) fibers.each { |fc| fc.counter.should eq(increments) } diff --git a/spec/std/file_spec.cr b/spec/std/file_spec.cr index e88adeed7ea2..337b489f5430 100644 --- a/spec/std/file_spec.cr +++ b/spec/std/file_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../support/thread" private def it_raises_on_null_byte(operation, file = __FILE__, line = __LINE__, end_line = __END_LINE__, &block) it "errors on #{operation}", file, line, end_line do @@ -109,7 +110,7 @@ describe "File" do # thread or process also opened the file; we should pass # O_NONBLOCK to the open(2) call itself, not afterwards file = nil - Thread.new { file = File.new(path, "w", blocking: nil) } + new_thread { file = File.new(path, "w", blocking: nil) } begin File.open(path, "r", blocking: false) do |file| diff --git a/spec/std/thread/condition_variable_spec.cr b/spec/std/thread/condition_variable_spec.cr index 1bf78f797357..f682b418e6f3 100644 --- a/spec/std/thread/condition_variable_spec.cr +++ b/spec/std/thread/condition_variable_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::ConditionVariable do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::ConditionVariable do waiting = 0 5.times do - Thread.new do + new_thread do mutex.synchronize do waiting += 1 cv1.wait(mutex) @@ -78,7 +79,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end diff --git a/spec/std/thread/mutex_spec.cr b/spec/std/thread/mutex_spec.cr index 99f3c5d385c3..8fb8e484e935 100644 --- a/spec/std/thread/mutex_spec.cr +++ b/spec/std/thread/mutex_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::Mutex do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::Mutex do mutex = Thread::Mutex.new threads = Array.new(10) do - Thread.new do + new_thread do mutex.synchronize { a += 1 } end end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::Mutex do mutex.try_lock.should be_false expect_raises(RuntimeError) { mutex.lock } mutex.unlock - Thread.new { mutex.synchronize { } }.join + new_thread { mutex.synchronize { } }.join end it "won't unlock from another thread" do @@ -30,7 +31,7 @@ pending_interpreted describe: Thread::Mutex do mutex.lock expect_raises(RuntimeError) do - Thread.new { mutex.unlock }.join + new_thread { mutex.unlock }.join end mutex.unlock diff --git a/spec/std/thread_spec.cr b/spec/std/thread_spec.cr index bdfc31a9a65c..9875db074909 100644 --- a/spec/std/thread_spec.cr +++ b/spec/std/thread_spec.cr @@ -1,16 +1,17 @@ require "./spec_helper" +require "../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread do it "allows passing an argumentless fun to execute" do a = 0 - thread = Thread.new { a = 1; 10 } + thread = new_thread { a = 1; 10 } thread.join a.should eq(1) end it "raises inside thread and gets it on join" do - thread = Thread.new { raise "OH NO" } + thread = new_thread { raise "OH NO" } expect_raises Exception, "OH NO" do thread.join end @@ -18,7 +19,7 @@ pending_interpreted describe: Thread do it "returns current thread object" do current = nil - thread = Thread.new { current = Thread.current } + thread = new_thread { current = Thread.current } thread.join current.should be(thread) current.should_not be(Thread.current) @@ -31,7 +32,7 @@ pending_interpreted describe: Thread do it "yields the processor" do done = false - thread = Thread.new do + thread = new_thread do 3.times { Thread.yield } done = true end @@ -51,7 +52,7 @@ pending_interpreted describe: Thread do {% end %} name = nil - thread = Thread.new(name: "some-name") do + thread = new_thread(name: "some-name") do name = Thread.current.name end thread.name.should eq("some-name") diff --git a/spec/support/thread.cr b/spec/support/thread.cr new file mode 100644 index 000000000000..dc61ba9257c5 --- /dev/null +++ b/spec/support/thread.cr @@ -0,0 +1,10 @@ +{% begin %} + def new_thread(name = nil, &block) : Thread + {% if flag?(:execution_context) %} + ctx = Fiber::ExecutionContext::Isolated.new(name: name || "SPEC") { block.call } + ctx.@thread + {% else %} + Thread.new(name) { block.call } + {% end %} + end +{% end %} diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 4a99d82e9134..333f3c5a617d 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -519,7 +519,11 @@ private module ConsoleUtils @@read_requests = Deque(ReadRequest).new @@bytes_read = Deque(Int32).new @@mtx = ::Thread::Mutex.new - @@reader_thread = ::Thread.new { reader_loop } + {% if flag?(:execution_context) %} + @@reader_thread = ::Fiber::ExecutionContext::Isolated.new("READER-LOOP") { reader_loop } + {% else %} + @@reader_thread = ::Thread.new { reader_loop } + {% end %} private def self.reader_loop while true diff --git a/src/fiber/execution_context/isolated.cr b/src/fiber/execution_context/isolated.cr new file mode 100644 index 000000000000..56e3f60997a8 --- /dev/null +++ b/src/fiber/execution_context/isolated.cr @@ -0,0 +1,229 @@ +require "./scheduler" +require "../list" + +module Fiber::ExecutionContext + # ST scheduler. Owns a single thread running a single fiber. + # + # Concurrency is disabled within the thread: the fiber owns the thread and the + # thread can only run this fiber. Keep in mind that the fiber will still run + # in parallel to other fibers running in other execution contexts. + # + # The fiber can still spawn fibers into other execution contexts. Since it can + # be inconvenient to pass an execution context around, calls to `::spawn` will + # spawn a fiber into the specified *spawn_context* that defaults to the + # default execution context. + # + # Isolated fibers can normally communicate with other fibers running in other + # execution contexts using `Channel(T)`, `WaitGroup` or `Mutex` for example. + # They can also execute IO operations or sleep just like any other fiber. + # + # Calls that result in waiting (e.g. sleep, or socket read/write) will block + # the thread since there are no other fibers to switch to. This in turn allows + # to call anything that would block the thread without blocking any other + # fiber. + # + # You can for example use an isolated fiber to run a blocking GUI loop, + # transparently forward `::spawn` to the default context, and eventually only + # block the current fiber while waiting for the GUI application to quit: + # + # ``` + # gtk = Fiber::ExecutionContext::Isolated.new("Gtk") do + # Gtk.main + # end + # gtk.wait + # ``` + class Isolated + include ExecutionContext + include ExecutionContext::Scheduler + + getter name : String + + @mutex : Thread::Mutex + protected getter thread : Thread + @main_fiber : Fiber + + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + + getter? running : Bool = true + @enqueued = false + @waiting = false + + @wait_list = Crystal::PointerLinkedList(Fiber::PointerLinkedListNode).new + @exception : Exception? + + # Starts a new thread named *name* to execute *func*. Once *func* returns + # the thread will terminate. + def initialize(@name : String, @spawn_context : ExecutionContext = ExecutionContext.default, &@func : ->) + @mutex = Thread::Mutex.new + @thread = uninitialized Thread + @main_fiber = uninitialized Fiber + @thread = start_thread + ExecutionContext.execution_contexts.push(self) + end + + private def start_thread : Thread + Thread.new(name: @name) do |thread| + @thread = thread + thread.execution_context = self + thread.scheduler = self + thread.main_fiber.name = @name + @main_fiber = thread.main_fiber + run + end + end + + # :nodoc: + def execution_context : Isolated + self + end + + # :nodoc: + def stack_pool : Fiber::StackPool + raise RuntimeError.new("No stack pool for isolated contexts") + end + + # :nodoc: + def stack_pool? : Fiber::StackPool? + end + + def spawn(*, name : String? = nil, &block : ->) : Fiber + @spawn_context.spawn(name: name, &block) + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + @spawn_context.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber, context: self + + unless fiber == @main_fiber + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + @mutex.synchronize do + raise RuntimeError.new("Can't resume dead fiber") unless @running + + @enqueued = true + + if @waiting + # wake up the blocked thread + @waiting = false + @event_loop.interrupt + else + # race: enqueued before the other thread started waiting + end + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + + loop do + @mutex.synchronize do + # race: another thread already re-enqueued the fiber + if @enqueued + Crystal.trace :sched, "resume" + @enqueued = false + @waiting = false + return + end + @waiting = true + end + + # wait on the event loop + list = Fiber::List.new + @event_loop.run(pointerof(list), blocking: true) + + if fiber = list.pop? + break if fiber == @main_fiber && list.empty? + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + # the evloop got interrupted: restart + end + + # cleanup + @mutex.synchronize do + @waiting = false + @enqueued = false + end + + Crystal.trace :sched, "resume" + end + + protected def resume(fiber : Fiber) : Nil + # in theory we could resume @main_fiber, but this method may only be + # called from the current execution context, which is @main_fiber; it + # doesn't make any sense for a fiber to resume itself + raise RuntimeError.new("Can't resume #{fiber} in #{self}") + end + + private def run + Crystal.trace :sched, "started" + @func.call + rescue exception + @exception = exception + ensure + @mutex.synchronize do + @running = false + @wait_list.consume_each(&.value.enqueue) + end + ExecutionContext.execution_contexts.delete(self) + end + + # Blocks the calling fiber until the isolated context fiber terminates. + # Returns immediately if the isolated fiber has already terminated. + # Re-raises unhandled exceptions raised by the fiber. + # + # For example: + # + # ``` + # ctx = Fiber::ExecutionContext::Isolated.new("test") do + # raise "fail" + # end + # ctx.wait # => re-raises "fail" + # ``` + def wait : Nil + if @running + node = Fiber::PointerLinkedListNode.new(Fiber.current) + @mutex.synchronize do + @wait_list.push(pointerof(node)) if @running + end + end + + if exception = @exception + raise exception + end + end + + # :nodoc: + # + # Simple alias to ease the transition from Thread. + def join + wait + end + + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + + def status : String + if @waiting + "event-loop" + elsif @running + "running" + else + "shutdown" + end + end + end +end