diff --git a/spec/std/concurrent/select_spec.cr b/spec/std/concurrent/select_spec.cr index 781bf6a0a46b..55794ad3b050 100644 --- a/spec/std/concurrent/select_spec.cr +++ b/spec/std/concurrent/select_spec.cr @@ -191,7 +191,7 @@ describe "select" do x.should eq 2 end - it "stress select with send/receive in multiple fibers" do + pending_win32 "stress select with send/receive in multiple fibers" do fibers = 4 msg_per_sender = 1000 ch = Array.new(fibers) { Array.new(fibers) { Channel(Int32).new } } diff --git a/spec/std/socket/tcp_socket_spec.cr b/spec/std/socket/tcp_socket_spec.cr index fc974f862445..3a3448bc495a 100644 --- a/spec/std/socket/tcp_socket_spec.cr +++ b/spec/std/socket/tcp_socket_spec.cr @@ -89,7 +89,7 @@ describe TCPSocket, tags: "network" do end end - pending_win32 "sync from server" do + it "sync from server" do port = unused_local_port TCPServer.open("::", port) do |server| @@ -155,4 +155,39 @@ describe TCPSocket, tags: "network" do end end end + + it "sends and receives messages" do + port = unused_local_port + + channel = Channel(Exception?).new + spawn do + TCPServer.open("::", port) do |server| + channel.send nil + sock = server.accept + sock.read_timeout = 3.second + sock.write_timeout = 3.second + + sock.gets(4).should eq("ping") + sock << "pong" + channel.send nil + end + rescue exc + channel.send exc + end + + if exc = channel.receive + raise exc + end + + TCPSocket.open("localhost", port) do |client| + client.read_timeout = 3.second + client.write_timeout = 3.second + client << "ping" + client.gets(4).should eq("pong") + end + + if exc = channel.receive + raise exc + end + end end diff --git a/spec/std/socket/unix_server_spec.cr b/spec/std/socket/unix_server_spec.cr index f860dbc7c9a4..0ddb580f4730 100644 --- a/spec/std/socket/unix_server_spec.cr +++ b/spec/std/socket/unix_server_spec.cr @@ -1,3 +1,4 @@ +{% skip_file if flag?(:win32) %} require "../spec_helper" require "socket" require "../../support/fibers" diff --git a/spec/std/socket/unix_socket_spec.cr b/spec/std/socket/unix_socket_spec.cr index 98278c177500..d48769c318b2 100644 --- a/spec/std/socket/unix_socket_spec.cr +++ b/spec/std/socket/unix_socket_spec.cr @@ -1,3 +1,4 @@ +{% skip_file if flag?(:win32) %} require "spec" require "socket" require "../../support/tempfile" diff --git a/spec/win32_std_spec.cr b/spec/win32_std_spec.cr index 3fd05217540c..bfb74a82270e 100644 --- a/spec/win32_std_spec.cr +++ b/spec/win32_std_spec.cr @@ -13,7 +13,7 @@ require "./std/big/number_spec.cr" require "./std/bit_array_spec.cr" require "./std/bool_spec.cr" require "./std/box_spec.cr" -# require "./std/channel_spec.cr" (failed codegen) +require "./std/channel_spec.cr" require "./std/char/reader_spec.cr" require "./std/char_spec.cr" require "./std/class_spec.cr" @@ -27,7 +27,7 @@ require "./std/compress/zip/zip_spec.cr" require "./std/compress/zlib/reader_spec.cr" require "./std/compress/zlib/stress_spec.cr" require "./std/compress/zlib/writer_spec.cr" -# require "./std/concurrent/select_spec.cr" (failed to run) +require "./std/concurrent/select_spec.cr" require "./std/concurrent_spec.cr" require "./std/crypto/bcrypt/base64_spec.cr" require "./std/crypto/bcrypt/password_spec.cr" diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 34539a9c9de4..4da70e22b494 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -37,15 +37,29 @@ module Crystal::EventLoop next_event = @@queue.min_by { |e| e.wake_at } if next_event - sleep_time = next_event.wake_at - Time.monotonic + now = Time.monotonic - if sleep_time > Time::Span.zero - LibC.Sleep(sleep_time.total_milliseconds) + if next_event.wake_at > now + sleep_time = next_event.wake_at - now + timed_out = IO::Overlapped.wait_queued_completions(sleep_time.total_milliseconds) do |fiber| + Crystal::Scheduler.enqueue fiber + end + + return unless timed_out end dequeue next_event - Crystal::Scheduler.enqueue next_event.fiber + fiber = next_event.fiber + + unless fiber.dead? + if next_event.timeout? && (select_action = fiber.timeout_select_action) + fiber.timeout_select_action = nil + select_action.time_expired(fiber) + else + Crystal::Scheduler.enqueue fiber + end + end else Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" ::exit @@ -80,14 +94,18 @@ module Crystal::EventLoop def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::Event Crystal::Event.new(Fiber.current) end + + def self.create_timeout_event(fiber) + Crystal::Event.new(fiber, timeout: true) + end end struct Crystal::Event getter fiber getter wake_at + getter? timeout - def initialize(@fiber : Fiber) - @wake_at = Time.monotonic + def initialize(@fiber : Fiber, @wake_at = Time.monotonic, *, @timeout = false) end # Frees the event @@ -95,6 +113,10 @@ struct Crystal::Event Crystal::EventLoop.dequeue(self) end + def delete + free + end + def add(time_span : Time::Span) : Nil @wake_at = Time.monotonic + time_span Crystal::EventLoop.enqueue(self) diff --git a/src/crystal/system/win32/socket.cr b/src/crystal/system/win32/socket.cr index 5519581a8dcb..334c98a894f3 100644 --- a/src/crystal/system/win32/socket.cr +++ b/src/crystal/system/win32/socket.cr @@ -341,7 +341,7 @@ module Crystal::System::Socket private def unbuffered_read(slice : Bytes) wsabuf = wsa_buffer(slice) - bytes_read = overlapped_read(fd, "WSARecv") do |overlapped| + bytes_read = overlapped_operation(fd, "WSARecv", read_timeout, connreset_is_error: false) do |overlapped| flags = 0_u32 LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil) end diff --git a/src/io/overlapped.cr b/src/io/overlapped.cr index f1eed904a42a..79dba7897615 100644 --- a/src/io/overlapped.cr +++ b/src/io/overlapped.cr @@ -1,5 +1,6 @@ {% skip_file unless flag?(:win32) %} require "c/handleapi" +require "crystal/system/thread_linked_list" module IO::Overlapped @read_timeout : Time::Span? @@ -49,100 +50,187 @@ module IO::Overlapped end end - @[Extern] - struct OverlappedOperation - getter overlapped : LibC::WSAOVERLAPPED + def self.wait_queued_completions(timeout) + overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[1] - def initialize(@overlapped) + if timeout > UInt64::MAX + timeout = LibC::INFINITE + else + timeout = timeout.to_u64 + end + result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, false) + if result == 0 + error = WinError.value + if timeout && error.wait_timeout? + return true + else + raise IO::Error.from_os_error("GetQueuedCompletionStatusEx", error) + end + end + + if removed == 0 + raise IO::Error.new("GetQueuedCompletionStatusEx returned 0") end - end - def create_operation - overlapped = LibC::WSAOVERLAPPED.new - OverlappedOperation.new(overlapped) + removed.times do |i| + OverlappedOperation.schedule(overlapped_entries[i].lpOverlapped) { |fiber| yield fiber } + end + + false end - def get_overlapped_result(socket, operation) - flags = 0_u32 - result = LibC.WSAGetOverlappedResult(socket, pointerof(operation).as(LibC::OVERLAPPED*), out bytes, false, pointerof(flags)) - if result.zero? - error = WinError.wsa_value - yield error + class OverlappedOperation + enum State + INITIALIZED + STARTED + DONE + CANCELLED + end + + @overlapped = LibC::WSAOVERLAPPED.new + @fiber : Fiber? = nil + @state : State = :initialized + property next : OverlappedOperation? + property previous : OverlappedOperation? + @@canceled = Thread::LinkedList(OverlappedOperation).new + + def self.run(socket) + operation = OverlappedOperation.new + begin + yield operation + ensure + operation.done(socket) + end + end + + def self.schedule(overlapped : LibC::WSAOVERLAPPED*) + start = overlapped.as(Pointer(UInt8)) - offsetof(OverlappedOperation, @overlapped) + operation = Box(OverlappedOperation).unbox(start.as(Pointer(Void))) + operation.schedule { |fiber| yield fiber } + end + + def start + raise Exception.new("Invalid state #{@state}") unless @state.initialized? + @fiber = Fiber.current + @state = State::STARTED + pointerof(@overlapped) + end + + def result(socket) + raise Exception.new("Invalid state #{@state}") unless @state.done? || @state.started? + flags = 0_u32 + result = LibC.WSAGetOverlappedResult(socket, pointerof(@overlapped), out bytes, false, pointerof(flags)) + if result.zero? + error = WinError.wsa_value + yield error + + raise IO::Error.from_os_error("WSAGetOverlappedResult", error) + end - raise IO::Error.from_os_error("WSAGetOverlappedResult", error) + bytes end - bytes + protected def schedule + case @state + when .started? + yield @fiber.not_nil! + @state = :done + when .cancelled? + @@canceled.delete(self) + else + raise Exception.new("Invalid state #{@state}") + end + end + + protected def done(socket) + case @state + when .started? + # Microsoft documentation: + # The application must not free or reuse the OVERLAPPED structure associated with the canceled I/O operations until they have completed + if LibC.CancelIoEx(LibC::HANDLE.new(socket), pointerof(@overlapped)) != 0 + @state = :cancelled + @@canceled.push(self) # to increase lifetime + end + end + end end # Returns `false` if the operation timed out. def schedule_overlapped(timeout : Time::Span?, line = __LINE__) : Bool - Crystal::EventLoop.wait_completion(timeout.try(&.total_milliseconds) || LibC::INFINITE) + if timeout + timeout_event = Crystal::Event.new(Fiber.current) + timeout_event.add(timeout) + else + timeout_event = Crystal::Event.new(Fiber.current, Time::Span::MAX) + end + Crystal::EventLoop.enqueue(timeout_event) + + Crystal::Scheduler.reschedule + + Crystal::EventLoop.dequeue(timeout_event) end def overlapped_operation(socket, method, timeout, connreset_is_error = true) - operation = create_operation - - result = yield pointerof(operation).as(LibC::OVERLAPPED*) + OverlappedOperation.run(socket) do |operation| + result = yield operation.start - if result == LibC::SOCKET_ERROR - error = WinError.wsa_value + if result == LibC::SOCKET_ERROR + error = WinError.wsa_value - unless error.wsa_io_pending? - raise IO::Error.from_os_error(method, error) + unless error.wsa_io_pending? + raise IO::Error.from_os_error(method, error) + end end - end - schedule_overlapped(timeout) + schedule_overlapped(timeout) - get_overlapped_result(socket, operation) do |error| - case error - when .wsa_io_incomplete? - raise TimeoutError.new("#{method} timed out") - when .wsaeconnreset? - return 0_u32 unless connreset_is_error + operation.result(socket) do |error| + case error + when .wsa_io_incomplete? + raise TimeoutError.new("#{method} timed out") + when .wsaeconnreset? + return 0_u32 unless connreset_is_error + end end end end def overlapped_connect(socket, method) - operation = create_operation - - yield pointerof(operation).as(LibC::OVERLAPPED*) - - unless schedule_overlapped(read_timeout) - return ::Socket::ConnectError.new(method) - end - - get_overlapped_result(socket, operation) do |error| - case error - when .wsa_io_incomplete?, .wsaeconnrefused? - return ::Socket::ConnectError.from_os_error(method, error) - when .error_operation_aborted? - # FIXME: Not sure why this is necessary - return ::Socket::ConnectError.from_os_error(method, error) + OverlappedOperation.run(socket) do |operation| + yield operation.start + + schedule_overlapped(read_timeout || 1.seconds) + + operation.result(socket) do |error| + case error + when .wsa_io_incomplete?, .wsaeconnrefused? + return ::Socket::ConnectError.from_os_error(method, error) + when .error_operation_aborted? + # FIXME: Not sure why this is necessary + return ::Socket::ConnectError.from_os_error(method, error) + end end - end - nil + nil + end end def overlapped_accept(socket, method) - operation = create_operation - - yield pointerof(operation).as(LibC::OVERLAPPED*) + OverlappedOperation.run(socket) do |operation| + yield operation.start - unless schedule_overlapped(read_timeout) - raise IO::TimeoutError.new("accept timed out") - end + unless schedule_overlapped(read_timeout) + raise IO::TimeoutError.new("accept timed out") + end - get_overlapped_result(socket, operation) do |error| - case error - when .wsa_io_incomplete?, .wsaenotsock? - return false + operation.result(socket) do |error| + case error + when .wsa_io_incomplete?, .wsaenotsock? + return false + end end - end - true + true + end end end diff --git a/src/lib_c/x86_64-windows-msvc/c/ioapiset.cr b/src/lib_c/x86_64-windows-msvc/c/ioapiset.cr index 986f65193c45..df42772fd20a 100644 --- a/src/lib_c/x86_64-windows-msvc/c/ioapiset.cr +++ b/src/lib_c/x86_64-windows-msvc/c/ioapiset.cr @@ -14,4 +14,11 @@ lib LibC dwMilliseconds : DWORD, fAlertable : BOOL ) : BOOL + fun CancelIoEx( + hFile : HANDLE, + lpOverlapped : OVERLAPPED* + ) : BOOL + fun CancelIo( + hFile : HANDLE + ) : BOOL end