From aa057ede8bbc0c5a13aa98c3ad622e26d5de619e Mon Sep 17 00:00:00 2001 From: Mark Raymond Jr Date: Sun, 22 Nov 2020 12:50:07 -0800 Subject: [PATCH 1/4] Add iocp plumbing --- src/lib_c/x86_64-windows-msvc/c/iocp.cr | 55 +++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/lib_c/x86_64-windows-msvc/c/iocp.cr diff --git a/src/lib_c/x86_64-windows-msvc/c/iocp.cr b/src/lib_c/x86_64-windows-msvc/c/iocp.cr new file mode 100644 index 000000000000..efbee1ff00d8 --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/iocp.cr @@ -0,0 +1,55 @@ +require "c/winnt" + +# WSAOVERLAPPED is the primary communication structure for async I/O on Windows. +# See https://docs.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped +@[Extern] +struct WSAOVERLAPPED + internal : LibC::ULONG_PTR + internalHigh : LibC::ULONG_PTR + offset : LibC::DWORD + offsetHigh : LibC::DWORD + hEvent : LibC::HANDLE + property cEvent : Void* + + def initialize(crystal_event : Crystal::Event) + @cEvent = crystal_event.unsafe_as(Pointer(Void)) + end +end + +@[Link("advapi32")] +lib LibC + + struct OVERLAPPED_ENTRY + lpCompletionKey : ULONG_PTR + lpOverlapped : WSAOVERLAPPED* + internal : ULONG_PTR + dwNumberOfBytesTransferred : DWORD + end + + fun WSAGetLastError() : Int + + fun CreateIoCompletionPort( + fileHandle : HANDLE, + existingCompletionPort : HANDLE, + completionKey : ULONG_PTR, + numberOfConcurrentThreads : DWORD + ) : HANDLE + + fun GetQueuedCompletionStatus( + completionPort : HANDLE, + lpNumberOfBytesTransferred : DWORD*, + lpCompletionKey : ULONG_PTR*, + lpOverlapped : WSAOVERLAPPED*, + dwMilliseconds : DWORD, + ) : BOOL + + fun GetQueuedCompletionStatusEx( + completionPort : HANDLE, + lpCompletionPortEntries : OVERLAPPED_ENTRY*, + ulCount : ULong, + ulNumEntriesRemoved : ULong*, + dwMilliseconds : DWORD, + fAlertable : BOOL + ) : BOOL + +end From 4c0a4db52ddadc2e9cda12d039c2a8608721bd74 Mon Sep 17 00:00:00 2001 From: Mark Raymond Jr Date: Sun, 22 Nov 2020 12:55:08 -0800 Subject: [PATCH 2/4] Implement basic iocp event loop for win32 --- src/crystal/system/win32/event_loop_iocp.cr | 56 +++++++++++++++------ src/crystal/system/win32/thread.cr | 15 +++++- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 1a80eab6c1b2..2d71e4f50afb 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -1,14 +1,30 @@ +require "c/iocp" require "crystal/system/print_error" module Crystal::EventLoop - @@queue = Deque(Fiber).new + @@queue = Deque(Crystal::Event).new # Runs the event loop. def self.run_once : Nil - next_fiber = @@queue.pop? + unless @@queue.empty? + next_event = @@queue.min_by { |e| e.wake_in } + time_elapsed = (Time.monotonic - next_event.slept_at) - if next_fiber - Crystal::Scheduler.enqueue next_fiber + unless time_elapsed > next_event.wake_in + sleepy_time = (next_event.wake_in - time_elapsed).total_milliseconds.to_i + io_entry = Slice.new(1, LibC::OVERLAPPED_ENTRY.new) + + if LibC.GetQueuedCompletionStatusEx(Thread.current.iocp, io_entry, 1, out removed, sleepy_time, false) + if removed == 1 && io_entry.first.lpOverlapped + next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event) + end + else + raise RuntimeError.from_winerror("Error getting i/o completion status") + end + end + + dequeue next_event + Crystal::Scheduler.enqueue next_event.fiber else Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" ::exit @@ -19,44 +35,56 @@ module Crystal::EventLoop def self.after_fork : Nil end - def self.enqueue(fiber : Fiber) - unless @@queue.includes?(fiber) - @@queue << fiber + def self.enqueue(event : Crystal::Event) + unless @@queue.includes?(event) + @@queue << event end end - def self.dequeue(fiber : Fiber) - @@queue.delete(fiber) + def self.dequeue(event : Crystal::Event) + @@queue.delete(event) end # Create a new resume event for a fiber. def self.create_resume_event(fiber : Fiber) : Crystal::Event - enqueue(fiber) - Crystal::Event.new(fiber) end # Creates a write event for a file descriptor. def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event + # TODO Set event's wake_in to write timeout. Crystal::Event.new(Fiber.current) end # Creates a read event for a file descriptor. def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event + # TODO Set event's wake_in to read timeout. Crystal::Event.new(Fiber.current) end end struct Crystal::Event + property slept_at : Time::Span + property wake_in : Time::Span + property fiber : Fiber + def initialize(@fiber : Fiber) + @wake_in = Time::Span::ZERO + @slept_at = Time::Span::ZERO end # Frees the event def free : Nil - Crystal::EventLoop.dequeue(@fiber) + Crystal::EventLoop.dequeue(self) end - def add(time_span : Time::Span?) : Nil - Crystal::EventLoop.enqueue(@fiber) + def add(time_span : Time::Span) : Nil + @slept_at = Time.monotonic + @wake_in = time_span + Crystal::EventLoop.enqueue(self) + end + + def to_unsafe + pointerof(LibC::WSAOVERLAPPED.new(self)) end end diff --git a/src/crystal/system/win32/thread.cr b/src/crystal/system/win32/thread.cr index 1af2ee9660f2..0a676752fd5c 100644 --- a/src/crystal/system/win32/thread.cr +++ b/src/crystal/system/win32/thread.cr @@ -8,7 +8,8 @@ class Thread @exception : Exception? @detached = Atomic(UInt8).new(0) @main_fiber : Fiber? - + getter iocp = LibC::HANDLE.null + # :nodoc: property next : Thread? @@ -21,9 +22,21 @@ class Thread def initialize @main_fiber = Fiber.new(stack_address, self) + @iocp = LibC.CreateIoCompletionPort(LibC::INVALID_HANDLE_VALUE, nil, 0, 0) + + if(@iocp == LibC::HANDLE.null) + raise RuntimeError.from_winerror("Failed to create i/o completion port for thread") + end + @@threads.push(self) end + def finalize + if LibC.CloseHandle(@iocp) == 0 + raise RuntimeError.from_winerror("Failed to close i/o completion port for thread") + end + end + @@current : Thread? = nil # Associates the Thread object to the running system thread. From c9a6ce19a6f2b2a56a738195d1787184d4db948a Mon Sep 17 00:00:00 2001 From: Mark Raymond Jr Date: Sun, 22 Nov 2020 13:53:40 -0800 Subject: [PATCH 3/4] Run formatting tool --- src/crystal/system/win32/event_loop_iocp.cr | 4 ++-- src/crystal/system/win32/thread.cr | 8 ++++---- src/lib_c/x86_64-windows-msvc/c/iocp.cr | 16 +++++++--------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 2d71e4f50afb..ce516d99f3ac 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -13,7 +13,7 @@ module Crystal::EventLoop unless time_elapsed > next_event.wake_in sleepy_time = (next_event.wake_in - time_elapsed).total_milliseconds.to_i io_entry = Slice.new(1, LibC::OVERLAPPED_ENTRY.new) - + if LibC.GetQueuedCompletionStatusEx(Thread.current.iocp, io_entry, 1, out removed, sleepy_time, false) if removed == 1 && io_entry.first.lpOverlapped next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event) @@ -83,7 +83,7 @@ struct Crystal::Event @wake_in = time_span Crystal::EventLoop.enqueue(self) end - + def to_unsafe pointerof(LibC::WSAOVERLAPPED.new(self)) end diff --git a/src/crystal/system/win32/thread.cr b/src/crystal/system/win32/thread.cr index 0a676752fd5c..b1beae9dfcb0 100644 --- a/src/crystal/system/win32/thread.cr +++ b/src/crystal/system/win32/thread.cr @@ -9,7 +9,7 @@ class Thread @detached = Atomic(UInt8).new(0) @main_fiber : Fiber? getter iocp = LibC::HANDLE.null - + # :nodoc: property next : Thread? @@ -23,9 +23,9 @@ class Thread def initialize @main_fiber = Fiber.new(stack_address, self) @iocp = LibC.CreateIoCompletionPort(LibC::INVALID_HANDLE_VALUE, nil, 0, 0) - - if(@iocp == LibC::HANDLE.null) - raise RuntimeError.from_winerror("Failed to create i/o completion port for thread") + + if (@iocp == LibC::HANDLE.null) + raise RuntimeError.from_winerror("Failed to create i/o completion port for thread") end @@threads.push(self) diff --git a/src/lib_c/x86_64-windows-msvc/c/iocp.cr b/src/lib_c/x86_64-windows-msvc/c/iocp.cr index efbee1ff00d8..cd847d7873fc 100644 --- a/src/lib_c/x86_64-windows-msvc/c/iocp.cr +++ b/src/lib_c/x86_64-windows-msvc/c/iocp.cr @@ -18,7 +18,6 @@ end @[Link("advapi32")] lib LibC - struct OVERLAPPED_ENTRY lpCompletionKey : ULONG_PTR lpOverlapped : WSAOVERLAPPED* @@ -26,12 +25,12 @@ lib LibC dwNumberOfBytesTransferred : DWORD end - fun WSAGetLastError() : Int - + fun WSAGetLastError : Int + fun CreateIoCompletionPort( - fileHandle : HANDLE, - existingCompletionPort : HANDLE, - completionKey : ULONG_PTR, + fileHandle : HANDLE, + existingCompletionPort : HANDLE, + completionKey : ULONG_PTR, numberOfConcurrentThreads : DWORD ) : HANDLE @@ -40,7 +39,7 @@ lib LibC lpNumberOfBytesTransferred : DWORD*, lpCompletionKey : ULONG_PTR*, lpOverlapped : WSAOVERLAPPED*, - dwMilliseconds : DWORD, + dwMilliseconds : DWORD ) : BOOL fun GetQueuedCompletionStatusEx( @@ -50,6 +49,5 @@ lib LibC ulNumEntriesRemoved : ULong*, dwMilliseconds : DWORD, fAlertable : BOOL - ) : BOOL - + ) : BOOL end From bc1a582a776aee59505072a761d234cfaf46d228 Mon Sep 17 00:00:00 2001 From: Mark Raymond Jr Date: Sun, 22 Nov 2020 14:58:25 -0800 Subject: [PATCH 4/4] Make to_unsafe directly create WSAOVERLAPPED --- src/crystal/system/win32/event_loop_iocp.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index ce516d99f3ac..d8481bbca22c 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -85,6 +85,6 @@ struct Crystal::Event end def to_unsafe - pointerof(LibC::WSAOVERLAPPED.new(self)) + WSAOVERLAPPED.new(self) end end