diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index d3b8994f0768..644c79310aa8 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -66,4 +66,27 @@ describe "concurrent" do it "accepts method call with receiver" do typeof(spawn String.new) end + + {% if flag?(:darwin) %} + pending "schedules intermitting sleeps" + # TODO: This spec fails on darwin, even with highly increased sleep times. Needs investigation. + {% else %} + it "schedules intermitting sleeps" do + chan = Channel(Int32).new + spawn do + 3.times do |i| + sleep 40.milliseconds + chan.send (i + 1) + end + end + spawn do + 2.times do |i| + sleep 100.milliseconds + chan.send (i + 1) * 10 + end + end + + Array(Int32).new(5) { chan.receive }.should eq [1, 2, 10, 3, 20] + end + {% end %} end diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 9fd2281b1027..34539a9c9de4 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -2,7 +2,7 @@ require "c/ioapiset" require "crystal/system/print_error" module Crystal::EventLoop - @@queue = Deque(Fiber).new + @@queue = Deque(Event).new # Returns the base IO Completion Port class_getter iocp : LibC::HANDLE do @@ -34,10 +34,18 @@ module Crystal::EventLoop # Runs the event loop. def self.run_once : Nil - next_fiber = @@queue.pop? + next_event = @@queue.min_by { |e| e.wake_at } - if next_fiber - Crystal::Scheduler.enqueue next_fiber + if next_event + sleep_time = next_event.wake_at - Time.monotonic + + if sleep_time > Time::Span.zero + LibC.Sleep(sleep_time.total_milliseconds) + end + + dequeue next_event + + Crystal::Scheduler.enqueue next_event.fiber else Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" ::exit @@ -48,20 +56,18 @@ module Crystal::EventLoop def self.after_fork : Nil end - def self.enqueue(fiber : Fiber) - unless @@queue.includes?(fiber) - @@queue << fiber + def self.enqueue(event : Event) + unless @@queue.includes?(event) + @@queue << event end end - def self.dequeue(fiber : Fiber) - @@queue.delete(fiber) + def self.dequeue(event : Event) + @@queue.delete(event) end # Create a new resume event for a fiber. def self.create_resume_event(fiber : Fiber) : Crystal::Event - enqueue(fiber) - Crystal::Event.new(fiber) end @@ -77,15 +83,20 @@ module Crystal::EventLoop end struct Crystal::Event + getter fiber + getter wake_at + def initialize(@fiber : Fiber) + @wake_at = Time.monotonic end # Frees the event def free : Nil - Crystal::EventLoop.dequeue(@fiber) + Crystal::EventLoop.dequeue(self) end - def add(time_span : Time::Span?) : Nil - Crystal::EventLoop.enqueue(@fiber) + def add(time_span : Time::Span) : Nil + @wake_at = Time.monotonic + time_span + Crystal::EventLoop.enqueue(self) end end diff --git a/src/windows_stubs.cr b/src/windows_stubs.cr index d55c53099c4f..01783ab9e71b 100644 --- a/src/windows_stubs.cr +++ b/src/windows_stubs.cr @@ -89,11 +89,3 @@ end enum Signal KILL = 0 end - -def sleep(seconds : Number) - sleep(seconds.seconds) -end - -def sleep(time : Time::Span) - LibC.Sleep(time.total_milliseconds) -end