diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 1a80eab6c1b2..d8481bbca22c 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -1,14 +1,30 @@ +require "c/iocp" require "crystal/system/print_error" module Crystal::EventLoop - @@queue = Deque(Fiber).new + @@queue = Deque(Crystal::Event).new # Runs the event loop. def self.run_once : Nil - next_fiber = @@queue.pop? + unless @@queue.empty? + next_event = @@queue.min_by { |e| e.wake_in } + time_elapsed = (Time.monotonic - next_event.slept_at) - if next_fiber - Crystal::Scheduler.enqueue next_fiber + unless time_elapsed > next_event.wake_in + sleepy_time = (next_event.wake_in - time_elapsed).total_milliseconds.to_i + io_entry = Slice.new(1, LibC::OVERLAPPED_ENTRY.new) + + if LibC.GetQueuedCompletionStatusEx(Thread.current.iocp, io_entry, 1, out removed, sleepy_time, false) + if removed == 1 && io_entry.first.lpOverlapped + next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event) + end + else + raise RuntimeError.from_winerror("Error getting i/o completion status") + end + end + + dequeue next_event + Crystal::Scheduler.enqueue next_event.fiber else Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" ::exit @@ -19,44 +35,56 @@ module Crystal::EventLoop def self.after_fork : Nil end - def self.enqueue(fiber : Fiber) - unless @@queue.includes?(fiber) - @@queue << fiber + def self.enqueue(event : Crystal::Event) + unless @@queue.includes?(event) + @@queue << event end end - def self.dequeue(fiber : Fiber) - @@queue.delete(fiber) + def self.dequeue(event : Crystal::Event) + @@queue.delete(event) end # Create a new resume event for a fiber. def self.create_resume_event(fiber : Fiber) : Crystal::Event - enqueue(fiber) - Crystal::Event.new(fiber) end # Creates a write event for a file descriptor. def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event + # TODO Set event's wake_in to write timeout. Crystal::Event.new(Fiber.current) end # Creates a read event for a file descriptor. def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event + # TODO Set event's wake_in to read timeout. Crystal::Event.new(Fiber.current) end end struct Crystal::Event + property slept_at : Time::Span + property wake_in : Time::Span + property fiber : Fiber + def initialize(@fiber : Fiber) + @wake_in = Time::Span::ZERO + @slept_at = Time::Span::ZERO end # Frees the event def free : Nil - Crystal::EventLoop.dequeue(@fiber) + Crystal::EventLoop.dequeue(self) + end + + def add(time_span : Time::Span) : Nil + @slept_at = Time.monotonic + @wake_in = time_span + Crystal::EventLoop.enqueue(self) end - def add(time_span : Time::Span?) : Nil - Crystal::EventLoop.enqueue(@fiber) + def to_unsafe + WSAOVERLAPPED.new(self) end end diff --git a/src/crystal/system/win32/thread.cr b/src/crystal/system/win32/thread.cr index 1af2ee9660f2..b1beae9dfcb0 100644 --- a/src/crystal/system/win32/thread.cr +++ b/src/crystal/system/win32/thread.cr @@ -8,6 +8,7 @@ class Thread @exception : Exception? @detached = Atomic(UInt8).new(0) @main_fiber : Fiber? + getter iocp = LibC::HANDLE.null # :nodoc: property next : Thread? @@ -21,9 +22,21 @@ class Thread def initialize @main_fiber = Fiber.new(stack_address, self) + @iocp = LibC.CreateIoCompletionPort(LibC::INVALID_HANDLE_VALUE, nil, 0, 0) + + if (@iocp == LibC::HANDLE.null) + raise RuntimeError.from_winerror("Failed to create i/o completion port for thread") + end + @@threads.push(self) end + def finalize + if LibC.CloseHandle(@iocp) == 0 + raise RuntimeError.from_winerror("Failed to close i/o completion port for thread") + end + end + @@current : Thread? = nil # Associates the Thread object to the running system thread. diff --git a/src/lib_c/x86_64-windows-msvc/c/iocp.cr b/src/lib_c/x86_64-windows-msvc/c/iocp.cr new file mode 100644 index 000000000000..cd847d7873fc --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/iocp.cr @@ -0,0 +1,53 @@ +require "c/winnt" + +# WSAOVERLAPPED is the primary communication structure for async I/O on Windows. +# See https://docs.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped +@[Extern] +struct WSAOVERLAPPED + internal : LibC::ULONG_PTR + internalHigh : LibC::ULONG_PTR + offset : LibC::DWORD + offsetHigh : LibC::DWORD + hEvent : LibC::HANDLE + property cEvent : Void* + + def initialize(crystal_event : Crystal::Event) + @cEvent = crystal_event.unsafe_as(Pointer(Void)) + end +end + +@[Link("advapi32")] +lib LibC + struct OVERLAPPED_ENTRY + lpCompletionKey : ULONG_PTR + lpOverlapped : WSAOVERLAPPED* + internal : ULONG_PTR + dwNumberOfBytesTransferred : DWORD + end + + fun WSAGetLastError : Int + + fun CreateIoCompletionPort( + fileHandle : HANDLE, + existingCompletionPort : HANDLE, + completionKey : ULONG_PTR, + numberOfConcurrentThreads : DWORD + ) : HANDLE + + fun GetQueuedCompletionStatus( + completionPort : HANDLE, + lpNumberOfBytesTransferred : DWORD*, + lpCompletionKey : ULONG_PTR*, + lpOverlapped : WSAOVERLAPPED*, + dwMilliseconds : DWORD + ) : BOOL + + fun GetQueuedCompletionStatusEx( + completionPort : HANDLE, + lpCompletionPortEntries : OVERLAPPED_ENTRY*, + ulCount : ULong, + ulNumEntriesRemoved : ULong*, + dwMilliseconds : DWORD, + fAlertable : BOOL + ) : BOOL +end