diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index ad80330ef681..9d8cc2812ab2 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -22,60 +22,32 @@ module Crystal::EventLoop end end - def self.create_fd_write_event(io : IO::FileDescriptor, edge_triggered : Bool = false) + def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false) flags = LibEvent2::EventFlags::Write flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - event = @@eb.new_event(io.fd, flags, io) do |s, flags, data| - fd_io = data.as(IO::FileDescriptor) - if flags.includes?(LibEvent2::EventFlags::Write) - fd_io.resume_write - elsif flags.includes?(LibEvent2::EventFlags::Timeout) - fd_io.resume_write(timed_out: true) - end - end - event - end - def self.create_fd_write_event(sock : Socket, edge_triggered : Bool = false) - flags = LibEvent2::EventFlags::Write - flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data| - sock_ref = data.as(Socket) + @@eb.new_event(io.fd, flags, io) do |s, flags, data| + io_ref = data.as(typeof(io)) if flags.includes?(LibEvent2::EventFlags::Write) - sock_ref.resume_write + io_ref.resume_write elsif flags.includes?(LibEvent2::EventFlags::Timeout) - sock_ref.resume_write(timed_out: true) + io_ref.resume_write(timed_out: true) end end - event end - def self.create_fd_read_event(io : IO::FileDescriptor, edge_triggered : Bool = false) + def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) flags = LibEvent2::EventFlags::Read flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - event = @@eb.new_event(io.fd, flags, io) do |s, flags, data| - fd_io = data.as(IO::FileDescriptor) - if flags.includes?(LibEvent2::EventFlags::Read) - fd_io.resume_read - elsif flags.includes?(LibEvent2::EventFlags::Timeout) - fd_io.resume_read(timed_out: true) - end - end - event - end - def self.create_fd_read_event(sock : Socket, edge_triggered : Bool = false) - flags = LibEvent2::EventFlags::Read - flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data| - sock_ref = data.as(Socket) + @@eb.new_event(io.fd, flags, io) do |s, flags, data| + io_ref = data.as(typeof(io)) if flags.includes?(LibEvent2::EventFlags::Read) - sock_ref.resume_read + io_ref.resume_read elsif flags.includes?(LibEvent2::EventFlags::Timeout) - sock_ref.resume_read(timed_out: true) + io_ref.resume_read(timed_out: true) end end - event end private def self.dns_base diff --git a/src/crystal/system/unix/file_descriptor.cr b/src/crystal/system/unix/file_descriptor.cr index 0ee34c60885e..29cd8fbcb962 100644 --- a/src/crystal/system/unix/file_descriptor.cr +++ b/src/crystal/system/unix/file_descriptor.cr @@ -1,23 +1,20 @@ require "c/fcntl" +require "io/evented" # :nodoc: module Crystal::System::FileDescriptor - include IO::Syscall + include IO::Evented @fd : Int32 - @read_event : Crystal::Event? - @write_event : Crystal::Event? - private def unbuffered_read(slice : Bytes) - read_syscall_helper(slice, "Error reading file") do - # `to_i32` is acceptable because `Slice#size` is a Int32 - LibC.read(@fd, slice, slice.size).to_i32 + evented_read(slice, "Error reading file") do + LibC.read(@fd, slice, slice.size) end end private def unbuffered_write(slice : Bytes) - write_syscall_helper(slice, "Error writing file") do |slice| + evented_write(slice, "Error writing file") do |slice| LibC.write(@fd, slice, slice.size).tap do |return_code| if return_code == -1 && Errno.value == Errno::EBADF raise IO::Error.new "File not open for writing" @@ -109,25 +106,7 @@ module Crystal::System::FileDescriptor # Mark the handle open, since we had to have dup'd a live handle. @closed = false - # We are now pointing to a new file descriptor, we need to re-register - # events with libevent and enqueue readers and writers again. - @read_event.try &.free - @read_event = nil - - @write_event.try &.free - @write_event = nil - - reschedule_waiting - end - - private def add_read_event(timeout = @read_timeout) : Nil - event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self) - event.add timeout - end - - private def add_write_event(timeout = @write_timeout) : Nil - event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self) - event.add timeout + evented_reopen end private def system_close @@ -140,12 +119,7 @@ module Crystal::System::FileDescriptor end end ensure - @read_event.try &.free - @read_event = nil - @write_event.try &.free - @write_event = nil - - reschedule_waiting + evented_close end def self.pipe(read_blocking, write_blocking) diff --git a/src/io/syscall.cr b/src/io/evented.cr similarity index 65% rename from src/io/syscall.cr rename to src/io/evented.cr index 9cf35945f0a8..ae2895c57136 100644 --- a/src/io/syscall.cr +++ b/src/io/evented.cr @@ -1,6 +1,6 @@ {% skip_file if flag?(:win32) %} -module IO::Syscall +module IO::Evented @read_timed_out = false @write_timed_out = false @@ -10,6 +10,9 @@ module IO::Syscall @readers : Deque(Fiber)? @writers : Deque(Fiber)? + @read_event : Crystal::Event? + @write_event : Crystal::Event? + # Returns the time to wait when reading before raising an `IO::Timeout`. def read_timeout : Time::Span? @read_timeout @@ -42,11 +45,12 @@ module IO::Syscall write_timeout end - def read_syscall_helper(slice : Bytes, errno_msg : String) : Int32 + def evented_read(slice : Bytes, errno_msg : String) : Int32 loop do - bytes_read = yield + bytes_read = yield slice if bytes_read != -1 - return bytes_read + # `to_i32` is acceptable because `Slice#size` is an Int32 + return bytes_read.to_i32 end if Errno.value == Errno::EAGAIN @@ -56,12 +60,10 @@ module IO::Syscall end end ensure - if (readers = @readers) && !readers.empty? - add_read_event - end + resume_pending_readers end - def write_syscall_helper(slice : Bytes, errno_msg : String) : Nil + def evented_write(slice : Bytes, errno_msg : String) : Nil return if slice.empty? begin @@ -79,12 +81,19 @@ module IO::Syscall end end ensure - if (writers = @writers) && !writers.empty? - add_write_event - end + resume_pending_writers end end + def evented_send(slice : Bytes, errno_msg : String) : Int32 + bytes_written = yield slice + raise Errno.new(errno_msg) if bytes_written == -1 + # `to_i32` is acceptable because `Slice#size` is an Int32 + bytes_written.to_i32 + ensure + resume_pending_writers + end + # :nodoc: def resume_read(timed_out = false) @read_timed_out = timed_out @@ -103,13 +112,11 @@ module IO::Syscall end end - # :nodoc: - def wait_readable(timeout = @read_timeout) + protected def wait_readable(timeout = @read_timeout) wait_readable(timeout: timeout) { |err| raise err } end - # :nodoc: - def wait_readable(timeout = @read_timeout) + protected def wait_readable(timeout = @read_timeout) : Nil readers = (@readers ||= Deque(Fiber).new) readers << Fiber.current add_read_event(timeout) @@ -119,19 +126,18 @@ module IO::Syscall @read_timed_out = false yield Timeout.new("Read timed out") end - - nil end - private abstract def add_read_event(timeout = @read_timeout) + private def add_read_event(timeout = @read_timeout) : Nil + event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self) + event.add timeout + end - # :nodoc: - def wait_writable(timeout = @write_timeout) + protected def wait_writable(timeout = @write_timeout) wait_writable(timeout: timeout) { |err| raise err } end - # :nodoc: - def wait_writable(timeout = @write_timeout) + protected def wait_writable(timeout = @write_timeout) : Nil writers = (@writers ||= Deque(Fiber).new) writers << Fiber.current add_write_event(timeout) @@ -141,13 +147,24 @@ module IO::Syscall @write_timed_out = false yield Timeout.new("Write timed out") end + end + + private def add_write_event(timeout = @write_timeout) : Nil + event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self) + event.add timeout + end - nil + def evented_reopen + evented_close end - private abstract def add_write_event(timeout = @write_timeout) + def evented_close + @read_event.try &.free + @read_event = nil + + @write_event.try &.free + @write_event = nil - private def reschedule_waiting if readers = @readers Crystal::Scheduler.enqueue readers readers.clear @@ -158,4 +175,16 @@ module IO::Syscall writers.clear end end + + private def resume_pending_readers + if (readers = @readers) && !readers.empty? + add_read_event + end + end + + private def resume_pending_writers + if (writers = @writers) && !writers.empty? + add_write_event + end + end end diff --git a/src/io/file_descriptor.cr b/src/io/file_descriptor.cr index 04a4658686e6..5609e78e801f 100644 --- a/src/io/file_descriptor.cr +++ b/src/io/file_descriptor.cr @@ -1,4 +1,3 @@ -require "./syscall" require "crystal/system/file_descriptor" # An `IO` over a file descriptor. diff --git a/src/socket.cr b/src/socket.cr index f6e0953f809f..12753cadcb07 100644 --- a/src/socket.cr +++ b/src/socket.cr @@ -4,10 +4,11 @@ require "c/netinet/in" require "c/netinet/tcp" require "c/sys/socket" require "c/sys/un" +require "io/evented" class Socket < IO include IO::Buffered - include IO::Syscall + include IO::Evented class Error < Exception end @@ -39,9 +40,6 @@ class Socket < IO getter fd : Int32 - @read_event : Crystal::Event? - @write_event : Crystal::Event? - @closed : Bool getter family : Family @@ -263,15 +261,9 @@ class Socket < IO # sock.connect Socket::UNIXAddress.new("/tmp/service.sock") # sock.send(Bytes[0]) # ``` - def send(message) - slice = message.to_slice - bytes_sent = LibC.send(fd, slice.to_unsafe.as(Void*), slice.size, 0) - raise Errno.new("Error sending datagram") if bytes_sent == -1 - bytes_sent - ensure - # see IO::FileDescriptor#unbuffered_write - if (writers = @writers) && !writers.empty? - add_write_event + def send(message) : Int32 + evented_send(message.to_slice, "Error sending datagram") do |slice| + LibC.send(fd, slice.to_unsafe.as(Void*), slice.size, 0) end end @@ -283,11 +275,12 @@ class Socket < IO # sock.connect("example.com", 2000) # sock.send("text query", to: server) # ``` - def send(message, to addr : Address) + def send(message, to addr : Address) : Int32 slice = message.to_slice bytes_sent = LibC.sendto(fd, slice.to_unsafe.as(Void*), slice.size, 0, addr, addr.size) raise Errno.new("Error sending datagram to #{addr}") if bytes_sent == -1 - bytes_sent + # to_i32 is fine because string/slice sizes are an Int32 + bytes_sent.to_i32 end # Receives a text message from the previously bound address. @@ -322,27 +315,15 @@ class Socket < IO {bytes_read, Address.from(sockaddr, addrlen)} end - protected def recvfrom(message) + protected def recvfrom(bytes) sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage)) - loop do - bytes_read = LibC.recvfrom(fd, message.to_unsafe.as(Void*), message.size, 0, sockaddr, pointerof(addrlen)) - if bytes_read == -1 - if Errno.value == Errno::EAGAIN - wait_readable - else - raise Errno.new("Error receiving datagram") - end - else - return {bytes_read.to_i, sockaddr, addrlen} - end - end - ensure - # see IO::FileDescriptor#unbuffered_read - if (readers = @readers) && !readers.empty? - add_read_event + bytes_read = evented_read(bytes, "Error receiving datagram") do |slice| + LibC.recvfrom(fd, slice.to_unsafe.as(Void*), slice.size, 0, sockaddr, pointerof(addrlen)) end + + {bytes_read, sockaddr, addrlen} end # Calls `shutdown(2)` with `SHUT_RD` @@ -539,30 +520,17 @@ class Socket < IO end private def unbuffered_read(slice : Bytes) - read_syscall_helper(slice, "Error reading socket") do - # `to_i32` is acceptable because `Slice#size` is a Int32 + evented_read(slice, "Error reading socket") do LibC.recv(@fd, slice, slice.size, 0).to_i32 end end private def unbuffered_write(slice : Bytes) - write_syscall_helper(slice, "Error writing to socket") do |slice| + evented_write(slice, "Error writing to socket") do |slice| LibC.send(@fd, slice, slice.size, 0) end end - private def add_read_event(timeout = @read_timeout) - event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self) - event.add timeout - nil - end - - private def add_write_event(timeout = @write_timeout) - event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self) - event.add timeout - nil - end - private def unbuffered_rewind raise IO::Error.new("Can't rewind") end @@ -581,13 +549,7 @@ class Socket < IO end @closed = true - - @read_event.try &.free - @read_event = nil - @write_event.try &.free - @write_event = nil - - reschedule_waiting + evented_close raise err if err end