Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions src/crystal/system/win32/event_loop_iocp.cr
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
require "c/iocp"
require "crystal/system/print_error"

module Crystal::EventLoop
@@queue = Deque(Fiber).new
@@queue = Deque(Crystal::Event).new

# Runs the event loop.
def self.run_once : Nil
next_fiber = @@queue.pop?
unless @@queue.empty?
next_event = @@queue.min_by { |e| e.wake_in }
time_elapsed = (Time.monotonic - next_event.slept_at)

if next_fiber
Crystal::Scheduler.enqueue next_fiber
unless time_elapsed > next_event.wake_in
sleepy_time = (next_event.wake_in - time_elapsed).total_milliseconds.to_i
io_entry = Slice.new(1, LibC::OVERLAPPED_ENTRY.new)

if LibC.GetQueuedCompletionStatusEx(Thread.current.iocp, io_entry, 1, out removed, sleepy_time, false)
if removed == 1 && io_entry.first.lpOverlapped
next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event)
end
Comment on lines +17 to +20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If removed is 0, this means that the sleepy_time passed without hitting any completed events, right? If so, if I read the code further down the min_value above is enqueued in the scheduler at line 27. But does anything actually cancel the completion of the event or keep track that it has timed out? Will it continue and eventually be completed? Because if it will then you will have a strange situation potentially enqueuing a fiber in unknown status unless I'm totally confused.

Copy link
Copy Markdown
Contributor Author

@neatorobito neatorobito Nov 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If removed is 0, this means that the sleepy_time passed without hitting any completed events, right?

Not exactly. If we make the GetQueuedCompletionStatusEx call, it times out and returns true, but removed is 0; then that means a sleep event completed. The idea was to treat sleep events the same as real I/O and use GetQueuedCompletionStatusEx's timeout as the mechanism for completing sleep events.

If so, if I read the code further down the min_value above is enqueued in the scheduler at line 27. But does anything actually cancel the completion of the event or keep track that it has timed out?

Yes, we then signal completion by removing the event from the event loop queue and telling the scheduler to switch to the event's associated fiber.

Will it continue and eventually be completed? Because if it will then you will have a strange situation potentially enqueuing a fiber in unknown status unless I'm totally confused.

As another example, let's say we're in the middle of a 5-second sleep event blocking on the GetQueuedCompletionStatusEx call, and some real I/O (like a socket or file read) comes in. In this case, GetQueuedCompletionStatusEx will return as soon as possible (likely before our 5-second sleep is up) with the information about the real I/O. The removed arg will be 1, and we'll have a pointer to lpOverlapped. That pointer contains the crystal event/fiber that started the real I/O. Since the 5-second sleep was interrupted, we can safely put aside handling the sleep event and instead set the real I/O as the next_event, dequeue it from the event loop, and tell the scheduler to run that associated fiber.

The sleep event that was put aside will get handled on the next iteration after we calculate time_elapsed (which will likely be a hair over 5 seconds), at which point we completely skip the GetQueuedCompletionStatusEx call and instead enqueue the fiber associated with the sleep event. Hopefully that example adds more context.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use GetQueuedCompletionStatusEx's timeout as the mechanism for completing sleep events

I see. I guess I got confused by having spent quite a lot of time staring at a different completion mechanism on linux (io_uring) that do support arbitrary wait events directly.

Ok, I don't know enough about windows to say if that or using CreateTimerQueueTimer or something similar would make most sense. But it makes a lot more sense than my initial impression at least. :)

How would event timeouts work? I suppose there is some way of putting that information in the entry on submission, but how would the result be communicated and would there need to be some handling in here for that?

Copy link
Copy Markdown
Contributor Author

@neatorobito neatorobito Dec 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would event timeouts work? I suppose there is some way of putting that information in the entry on submission, but how would the result be communicated and would there need to be some handling in here for that?

I missed this. After GetQueuedCompletionStatusEx is called and times out, it could be a sleep event completing or an I/O timeout, but I'm not handling that second case. There's an IO::Evented instance passed in to create read/write events with a read/write_timed_out property on it.

I will add some logic in the event loop to check if next_event was a read/write event. If so, set that boolean accordingly and thereby communicate the result.

else
raise RuntimeError.from_winerror("Error getting i/o completion status")
Copy link
Copy Markdown
Contributor

@cfsamson cfsamson Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@incognitorobito I think you need to check the error code here since a timeout will trow a RuntimeError in this case. See GetQueuedCompletionStatusEx docs on the timeout parameter

The number of milliseconds that the caller is willing to wait for a completion packet to appear at the completion port. If a completion packet does not appear within the specified time, the function times out and returns FALSE.

We should call GetLastError and check if the error code is WAIT_TIMEOUT (code 258). If that's the case you know it was either an event that timed out or a timer which has expired.

end
end

dequeue next_event
Crystal::Scheduler.enqueue next_event.fiber
else
Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n"
::exit
Expand All @@ -19,44 +35,56 @@ module Crystal::EventLoop
def self.after_fork : Nil
end

def self.enqueue(fiber : Fiber)
unless @@queue.includes?(fiber)
@@queue << fiber
def self.enqueue(event : Crystal::Event)
unless @@queue.includes?(event)
@@queue << event
end
end

def self.dequeue(fiber : Fiber)
@@queue.delete(fiber)
def self.dequeue(event : Crystal::Event)
@@queue.delete(event)
end

# Create a new resume event for a fiber.
def self.create_resume_event(fiber : Fiber) : Crystal::Event
enqueue(fiber)

Crystal::Event.new(fiber)
end

# Creates a write event for a file descriptor.
def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event
# TODO Set event's wake_in to write timeout.
Crystal::Event.new(Fiber.current)
end

# Creates a read event for a file descriptor.
def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event
# TODO Set event's wake_in to read timeout.
Crystal::Event.new(Fiber.current)
end
end

struct Crystal::Event
property slept_at : Time::Span
property wake_in : Time::Span
property fiber : Fiber

def initialize(@fiber : Fiber)
@wake_in = Time::Span::ZERO
@slept_at = Time::Span::ZERO
end

# Frees the event
def free : Nil
Crystal::EventLoop.dequeue(@fiber)
Crystal::EventLoop.dequeue(self)
end

def add(time_span : Time::Span) : Nil
@slept_at = Time.monotonic
@wake_in = time_span
Crystal::EventLoop.enqueue(self)
end

def add(time_span : Time::Span?) : Nil
Crystal::EventLoop.enqueue(@fiber)
def to_unsafe
WSAOVERLAPPED.new(self)
end
end
13 changes: 13 additions & 0 deletions src/crystal/system/win32/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Thread
@exception : Exception?
@detached = Atomic(UInt8).new(0)
@main_fiber : Fiber?
getter iocp = LibC::HANDLE.null

# :nodoc:
property next : Thread?
Expand All @@ -21,9 +22,21 @@ class Thread

def initialize
@main_fiber = Fiber.new(stack_address, self)
@iocp = LibC.CreateIoCompletionPort(LibC::INVALID_HANDLE_VALUE, nil, 0, 0)

if (@iocp == LibC::HANDLE.null)
raise RuntimeError.from_winerror("Failed to create i/o completion port for thread")
end

@@threads.push(self)
end

def finalize
if LibC.CloseHandle(@iocp) == 0
raise RuntimeError.from_winerror("Failed to close i/o completion port for thread")
end
end

@@current : Thread? = nil

# Associates the Thread object to the running system thread.
Expand Down
53 changes: 53 additions & 0 deletions src/lib_c/x86_64-windows-msvc/c/iocp.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require "c/winnt"

# WSAOVERLAPPED is the primary communication structure for async I/O on Windows.
# See https://docs.microsoft.com/en-us/windows/win32/api/winsock2/ns-winsock2-wsaoverlapped
@[Extern]
struct WSAOVERLAPPED
internal : LibC::ULONG_PTR
internalHigh : LibC::ULONG_PTR
offset : LibC::DWORD
offsetHigh : LibC::DWORD
hEvent : LibC::HANDLE
property cEvent : Void*

def initialize(crystal_event : Crystal::Event)
@cEvent = crystal_event.unsafe_as(Pointer(Void))
Copy link
Copy Markdown
Contributor

@kubo kubo Dec 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work only when the size of Crystal::Event is less than or equal to 8.

Crystal::Event is converted to Pointer(Void) here and reverted by the following code in event_loop_iocp.cr

       next_event = io_entry.first.lpOverlapped.value.cEvent.unsafe_as(Crystal::Event)

I suspected that it reverted only the first 8 bytes. So I wrote the following code and ran it.

struct DummyEvent
  property buf : UInt8[32] = StaticArray(UInt8, 32).new { |i| i.to_u8 }
end

crystal_event = DummyEvent.new
puts(crystal_event)
cEvent = crystal_event.unsafe_as(Pointer(Void))
puts(cEvent.unsafe_as(DummyEvent))

This printed:

DummyEvent(@buf=StaticArray[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30
, 31])
DummyEvent(@buf=StaticArray[0, 1, 2, 3, 4, 5, 6, 7, 7, 226, 105, 253, 3, 2, 0, 0, 10, 251, 215, 22, 252, 0, 0, 0, 9, 116, 106, 253, 3, 2, 0,
 0])

Only the first 8 bytes were reverted. The rest bytes seem uninitialized ones.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a long time since I worked with pointers in Crystal, but it seems like this gives the correct result:

Store the pointer to Crystal::Event:

def initialize(crystal_event : Crystal::Event)
  @cEvent = pointerof(crystal_event)
  ...

Get the the pointed-to value back:

next_event = io_entry.first.lpOverlapped.value.cEvent.value

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you do unsafe_as(Pointer(Void)) the value is interpreted as a pointer. That doesn't work because crystal_event is not a pointer, but the actual value.

pointerof(crystal_event) isn't reliable because it points at the stack which will be invalidated once the initialize method returns. You either need to ensure that the stack pointer is valid during the lifetime of its use (not sure if that's the case here), or put it on the heap.

end
end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this struct should be moved to src/crystal/system/win32/event_loop_iocp.cr. That's because this has the additional property cEvent compared with WSAOVERLAPPED in C.

WSAOVERLAPPED is the primary communication structure for async I/O on Windows.

I don't think so. The primary communication structure is OVERLAPPED. WSAOVERLAPPED is defined as #define WSAOVERLAPPED OVERLAPPED in winsock2.h and used only in Windows Socket API. OVERLAPPED is used in all others.

In addition, OVERLAPPED has been in fileapi.cr already. So I think that the struct should be changed as follows:

require "c/fileapi"

@[Extern]
struct Overlapped
  overlapped : LibC::OVERRLAPPED
  property cEvent : Void*

  def initialize(crystal_event : Crystal::Event)
    @cEvent = crystal_event.unsafe_as(Pointer(Void))
  end
end


@[Link("advapi32")]
lib LibC
struct OVERLAPPED_ENTRY
lpCompletionKey : ULONG_PTR
lpOverlapped : WSAOVERLAPPED*
internal : ULONG_PTR
dwNumberOfBytesTransferred : DWORD
end

fun WSAGetLastError : Int

fun CreateIoCompletionPort(
fileHandle : HANDLE,
existingCompletionPort : HANDLE,
completionKey : ULONG_PTR,
numberOfConcurrentThreads : DWORD
) : HANDLE

fun GetQueuedCompletionStatus(
completionPort : HANDLE,
lpNumberOfBytesTransferred : DWORD*,
lpCompletionKey : ULONG_PTR*,
lpOverlapped : WSAOVERLAPPED*,
dwMilliseconds : DWORD
) : BOOL

fun GetQueuedCompletionStatusEx(
completionPort : HANDLE,
lpCompletionPortEntries : OVERLAPPED_ENTRY*,
ulCount : ULong,
ulNumEntriesRemoved : ULong*,
dwMilliseconds : DWORD,
fAlertable : BOOL
) : BOOL
end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the definitions in this iocp.cr move to other places? That's because almost definitions in this directory are placed in the .cr files corresponding to .h files. For examples, CreateFileW is in fileapi.cr because it is declared in fileapi.h. _wmkdir is in dirent.cr because it is declared dirent.h. Well, I think this is just a custom and not documented. Not all definitions comply. For example OVERLAPPED is in fileapi.cr though it is declared in minwinbase.h.

I think that it is better to move them as follows:
OVERLAPPED_ENTRY (along with OVERLAPPED?) -> minwinbase.cr
CreateIoCompletionPort, GetQueuedCompletionStatus, GetQueuedCompletionStatusEx -> ioapiset.cr