diff --git a/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr b/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr index 6227ad57028e..58bd19ef8f9b 100644 --- a/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr +++ b/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr @@ -3,6 +3,10 @@ require "spec" class Crystal::EventLoop::FakeLoop < Crystal::EventLoop::Polling + def self.default_socket_blocking? + false + end + getter operations = [] of {Symbol, Int32, Arena::Index | Bool} private def system_run(blocking : Bool, & : Fiber ->) : Nil diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index e6e1bb0b260d..d80be9d630a7 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -25,6 +25,10 @@ abstract class Crystal::EventLoop backend_class.new end + def self.default_socket_blocking? : Bool + backend_class.default_socket_blocking? + end + @[AlwaysInline] def self.current : self {% if flag?(:execution_context) %} diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 7ae53a867590..73cb32953c89 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -10,6 +10,10 @@ require "./iocp/*" # :nodoc: class Crystal::EventLoop::IOCP < Crystal::EventLoop + def self.default_socket_blocking? + true + end + @waitable_timer : System::WaitableTimer? @timer_packet : LibC::HANDLE? @timer_key : System::IOCP::CompletionKey? @@ -289,6 +293,17 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop file_descriptor.file_descriptor_close end + def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool} + blocking = true if blocking.nil? + fd = System::Socket.socket(family, type, protocol, blocking) + create_completion_port LibC::HANDLE.new(fd) + {fd, blocking} + end + + def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool) + raise NotImplementedError.new("Crystal::EventLoop::IOCP#socketpair") + end + private def wsa_buffer(bytes) wsabuf = LibC::WSABUF.new wsabuf.len = bytes.size @@ -376,7 +391,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end end - def accept(socket : ::Socket) : ::Socket::Handle? + def accept(socket : ::Socket) : {::Socket::Handle, Bool}? socket.system_accept do |client_handle| address_size = sizeof(LibC::SOCKADDR_STORAGE) + 16 diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 6434768ee77d..82de530b9dd5 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -2,6 +2,10 @@ require "./libevent/event" # :nodoc: class Crystal::EventLoop::LibEvent < Crystal::EventLoop + def self.default_socket_blocking? + false + end + private getter(event_base) { Crystal::EventLoop::LibEvent::Event::Base.new } def after_fork_before_exec : Nil @@ -166,6 +170,16 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop file_descriptor.file_descriptor_close end + def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool} + socket = System::Socket.socket(family, type, protocol, !!blocking) + {socket, !!blocking} + end + + def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool) + socket = System::Socket.socketpair(type, protocol, blocking: false) + {socket, false} + end + def read(socket : ::Socket, slice : Bytes) : Int32 evented_read(socket, "Error reading socket") do LibC.recv(socket.fd, slice, slice.size, 0).to_i32 @@ -231,11 +245,11 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end - def accept(socket : ::Socket) : ::Socket::Handle? + def accept(socket : ::Socket) : {::Socket::Handle, Bool}? loop do client_fd = {% if LibC.has_method?(:accept4) %} - LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC) + LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK) {% else %} # we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we # can't call `Crystal::System::Socket.lock_read` because the socket @@ -246,7 +260,10 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop # could change the socket back to blocking mode between the condition # check and the `accept` call. fd = LibC.accept(socket.fd, nil, nil) - Crystal::System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1 + unless fd == -1 + System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) + System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK) + end fd {% end %} @@ -262,7 +279,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop raise ::Socket::Error.from_errno("accept") end else - return client_fd + return {client_fd, false} end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index da225b832073..285d351ca7c8 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -56,6 +56,10 @@ end # before suspending the fiber, then after resume it will raise # `IO::TimeoutError` if the event timed out, and continue otherwise. abstract class Crystal::EventLoop::Polling < Crystal::EventLoop + def self.default_socket_blocking? + false + end + # The generational arena: # # 1. decorrelates the fd from the IO since the evloop only really cares about @@ -226,6 +230,16 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # socket interface, see Crystal::EventLoop::Socket + def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool} + socket = System::Socket.socket(family, type, protocol, !!blocking) + {socket, !!blocking} + end + + def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool) + socket = System::Socket.socketpair(type, protocol, blocking: false) + {socket, false} + end + def read(socket : ::Socket, slice : Bytes) : Int32 size = evented_read(socket, slice, socket.@read_timeout) raise IO::Error.from_errno("read", target: socket) if size == -1 @@ -250,11 +264,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end end - def accept(socket : ::Socket) : ::Socket::Handle? + def accept(socket : ::Socket) : {::Socket::Handle, Bool}? loop do client_fd = {% if LibC.has_method?(:accept4) %} - LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC) + LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK) {% else %} # we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we # can't call `Crystal::System::Socket.lock_read` because the socket @@ -265,11 +279,14 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # could change the socket back to blocking mode between the condition # check and the `accept` call. LibC.accept(socket.fd, nil, nil).tap do |fd| - System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1 + unless fd == -1 + System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) + System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK) + end end {% end %} - return client_fd unless client_fd == -1 + return {client_fd, false} unless client_fd == -1 return if socket.closed? if Errno.value == Errno::EAGAIN diff --git a/src/crystal/event_loop/socket.cr b/src/crystal/event_loop/socket.cr index 2e3679e615c5..f31a1d6cdd70 100644 --- a/src/crystal/event_loop/socket.cr +++ b/src/crystal/event_loop/socket.cr @@ -4,6 +4,14 @@ abstract class Crystal::EventLoop module Socket + # Creates a new socket file descriptor or handle and returns it, along with + # whether the blocking flag has been set. + abstract def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool} + + # Creates a pair of UNIX socket file descriptors or handles and returns + # them, along with whether the blocking mode has been set. + abstract def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool) + # Reads at least one byte from the socket into *slice*. # # Blocks the current fiber if no data is available for reading, continuing @@ -37,7 +45,7 @@ abstract class Crystal::EventLoop # becomes available. Otherwise returns immediately. # # Returns a handle to the socket for the new connection. - abstract def accept(socket : ::Socket) : ::Socket::Handle? + abstract def accept(socket : ::Socket) : {::Socket::Handle, Bool}? # Opens a connection on *socket* to the target *address*. # diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 60ba11d738b0..b1c46e24298a 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -1,5 +1,9 @@ # :nodoc: class Crystal::EventLoop::Wasi < Crystal::EventLoop + def self.default_socket_blocking? + false + end + # Runs the event loop. def run(blocking : Bool) : Bool raise NotImplementedError.new("Crystal::Wasi::EventLoop.run") @@ -73,6 +77,14 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop file_descriptor.file_descriptor_close end + def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol) : {::Socket::Handle, Bool} + raise NotImplementedError.new("Crystal::EventLoop::Wasi#socket") + end + + def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle} + raise NotImplementedError.new("Crystal::EventLoop::Wasi#socketpair") + end + def read(socket : ::Socket, slice : Bytes) : Int32 evented_read(socket, "Error reading socket") do LibC.recv(socket.fd, slice, slice.size, 0).to_i32 diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index 54648f17f7db..be1c3afa8b78 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -1,9 +1,6 @@ require "../event_loop/socket" module Crystal::System::Socket - # Creates a file descriptor / socket handle - # private def create_handle(family, type, protocol, blocking) : Handle - # Initializes a file descriptor / socket handle for use with Crystal Socket # private def initialize_handle(fd) @@ -17,10 +14,6 @@ module Crystal::System::Socket # private def system_listen(backlog) - private def system_accept - event_loop.accept(self) - end - private def system_send_to(bytes : Bytes, addr : ::Socket::Address) event_loop.send_to(self, bytes, addr) end @@ -79,7 +72,7 @@ module Crystal::System::Socket # def self.fcntl(fd, cmd, arg = 0) - # def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle} + # def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle} private def system_read(slice : Bytes) : Int32 event_loop.read(self, slice) diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 43bfcd8ad26e..5f7307c46cc6 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -9,9 +9,11 @@ module Crystal::System::Socket alias Handle = Int32 - private def create_handle(family, type, protocol, blocking) : Handle + def self.socket(family, type, protocol, blocking) : Handle {% if LibC.has_constant?(:SOCK_CLOEXEC) %} - fd = LibC.socket(family, type.value | LibC::SOCK_CLOEXEC, protocol) + flags = type.value | LibC::SOCK_CLOEXEC + flags |= LibC::SOCK_NONBLOCK unless blocking + fd = LibC.socket(family, flags, protocol) raise ::Socket::Error.from_errno("Failed to create socket") if fd == -1 fd {% else %} @@ -19,12 +21,13 @@ module Crystal::System::Socket fd = LibC.socket(family, type, protocol) raise ::Socket::Error.from_errno("Failed to create socket") if fd == -1 Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) + Socket.fcntl(fd, LibC::F_SETFL, Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK) unless blocking fd end {% end %} end - private def initialize_handle(fd) + private def initialize_handle(fd, blocking = nil) {% if Crystal::EventLoop.has_constant?(:Polling) %} @__evloop_data = Crystal::EventLoop::Polling::Arena::INVALID_INDEX {% end %} @@ -182,11 +185,13 @@ module Crystal::System::Socket r end - def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle} + def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle} fds = uninitialized Handle[2] {% if LibC.has_constant?(:SOCK_CLOEXEC) %} - if LibC.socketpair(::Socket::Family::UNIX, type.value | LibC::SOCK_CLOEXEC, protocol, fds) == -1 + flags = type.value | LibC::SOCK_CLOEXEC + flags |= LibC::SOCK_NONBLOCK unless blocking + if LibC.socketpair(::Socket::Family::UNIX, flags, protocol, fds) == -1 raise ::Socket::Error.new("socketpair() failed") end {% else %} @@ -196,6 +201,10 @@ module Crystal::System::Socket end fcntl(fds[0], LibC::F_SETFD, LibC::FD_CLOEXEC) fcntl(fds[1], LibC::F_SETFD, LibC::FD_CLOEXEC) + unless blocking + fcntl(fds[0], LibC::F_SETFL, fcntl(fds[0], LibC::F_GETFL) | LibC::O_NONBLOCK) + fcntl(fds[1], LibC::F_SETFL, fcntl(fds[0], LibC::F_GETFL) | LibC::O_NONBLOCK) + end end {% end %} diff --git a/src/crystal/system/wasi/socket.cr b/src/crystal/system/wasi/socket.cr index 21c24d229019..b5bc4035aec1 100644 --- a/src/crystal/system/wasi/socket.cr +++ b/src/crystal/system/wasi/socket.cr @@ -8,11 +8,7 @@ module Crystal::System::Socket alias Handle = Int32 - private def create_handle(family, type, protocol, blocking) : Handle - raise NotImplementedError.new "Crystal::System::Socket#create_handle" - end - - private def initialize_handle(fd) + private def initialize_handle(fd, blocking = nil) end # Tries to bind the socket to a local address. @@ -135,10 +131,6 @@ module Crystal::System::Socket r end - def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle} - raise NotImplementedError.new("Crystal::System::Socket.socketpair") - end - private def system_tty? LibC.isatty(fd) == 1 end diff --git a/src/crystal/system/win32/socket.cr b/src/crystal/system/win32/socket.cr index bfb82581204b..99c2002d5620 100644 --- a/src/crystal/system/win32/socket.cr +++ b/src/crystal/system/win32/socket.cr @@ -71,18 +71,20 @@ module Crystal::System::Socket initialize_extension_functions - private def create_handle(family, type, protocol, blocking) : Handle + def self.socket(family, type, protocol, blocking) : Handle + # the overlapped flag is distinct from the blocking mode in winsock (the + # latter acts like non-blocking BSD sockets); there's no downside to set the + # overlapped flag, we can do sync or async calls, and we can still change + # the blocking mode socket = LibC.WSASocketW(family, type, protocol, nil, 0, LibC::WSA_FLAG_OVERLAPPED) - if socket == LibC::INVALID_SOCKET - raise ::Socket::Error.from_wsa_error("WSASocketW") - end - - Crystal::EventLoop.current.create_completion_port LibC::HANDLE.new(socket) - + raise ::Socket::Error.from_wsa_error("WSASocketW") if socket == LibC::INVALID_SOCKET + set_blocking(socket, blocking) unless blocking socket end - private def initialize_handle(handle) + private def initialize_handle(handle, blocking = nil) + @blocking = blocking unless blocking.nil? + unless @family.unix? system_getsockopt(handle, LibC::SO_REUSEADDR, 0) do |value| if value == 0 @@ -178,12 +180,12 @@ module Crystal::System::Socket end end - def system_accept(& : Handle -> Bool) : Handle? - client_socket = create_handle(family, type, protocol, blocking) + def system_accept(& : Handle -> Bool) : {Handle, Bool}? + client_socket, blocking = Crystal::EventLoop.current.socket(family, type, protocol, nil) initialize_handle(client_socket) if yield client_socket - client_socket + {client_socket, blocking} else LibC.closesocket(client_socket) @@ -340,8 +342,14 @@ module Crystal::System::Socket end private def system_blocking=(@blocking) + Socket.set_blocking(fd, blocking) + end + + # Changes the blocking mode as per BSD sockets, has no effect on the + # overlapped flag. + def self.set_blocking(fd, blocking) mode = blocking ? 1_u32 : 0_u32 - ret = LibC.WSAIoctl(fd, LibC::FIONBIO, pointerof(mode), sizeof(UInt32), nil, 0, out bytes_returned, nil, nil) + ret = LibC.WSAIoctl(fd, LibC::FIONBIO, pointerof(mode), sizeof(UInt32), nil, 0, out _, nil, nil) raise ::Socket::Error.from_wsa_error("WSAIoctl") unless ret.zero? end @@ -357,10 +365,6 @@ module Crystal::System::Socket raise NotImplementedError.new "Crystal::System::Socket.fcntl" end - def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle} - raise NotImplementedError.new("Crystal::System::Socket.socketpair") - end - private def system_tty? LibC.GetConsoleMode(LibC::HANDLE.new(fd), out _) != 0 end diff --git a/src/lib_c/aarch64-android/c/sys/socket.cr b/src/lib_c/aarch64-android/c/sys/socket.cr index d52a5c1110ab..dc4c264edc9e 100644 --- a/src/lib_c/aarch64-android/c/sys/socket.cr +++ b/src/lib_c/aarch64-android/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0o2000000 + SOCK_NONBLOCK = 0o0004000 alias SocklenT = UInt32 alias SaFamilyT = UShort diff --git a/src/lib_c/aarch64-linux-gnu/c/sys/socket.cr b/src/lib_c/aarch64-linux-gnu/c/sys/socket.cr index 7935dd8b3550..20bf6b0a3729 100644 --- a/src/lib_c/aarch64-linux-gnu/c/sys/socket.cr +++ b/src/lib_c/aarch64-linux-gnu/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 524288 + SOCK_NONBLOCK = 2048 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/aarch64-linux-musl/c/sys/socket.cr b/src/lib_c/aarch64-linux-musl/c/sys/socket.cr index 51211386e8bd..426d3ffcd556 100644 --- a/src/lib_c/aarch64-linux-musl/c/sys/socket.cr +++ b/src/lib_c/aarch64-linux-musl/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0o2000000 + SOCK_NONBLOCK = 0o0004000 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/arm-linux-gnueabihf/c/sys/socket.cr b/src/lib_c/arm-linux-gnueabihf/c/sys/socket.cr index 4a2641d3ecd3..5a769273d6a9 100644 --- a/src/lib_c/arm-linux-gnueabihf/c/sys/socket.cr +++ b/src/lib_c/arm-linux-gnueabihf/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 524288 + SOCK_NONBLOCK = 2048 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/i386-linux-gnu/c/sys/socket.cr b/src/lib_c/i386-linux-gnu/c/sys/socket.cr index 6473b6bad757..06896168dab8 100644 --- a/src/lib_c/i386-linux-gnu/c/sys/socket.cr +++ b/src/lib_c/i386-linux-gnu/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 524288 + SOCK_NONBLOCK = 2048 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/i386-linux-musl/c/sys/socket.cr b/src/lib_c/i386-linux-musl/c/sys/socket.cr index 51211386e8bd..426d3ffcd556 100644 --- a/src/lib_c/i386-linux-musl/c/sys/socket.cr +++ b/src/lib_c/i386-linux-musl/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0o2000000 + SOCK_NONBLOCK = 0o0004000 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/x86_64-dragonfly/c/sys/socket.cr b/src/lib_c/x86_64-dragonfly/c/sys/socket.cr index 0d30f295ed04..25e71c5435e1 100644 --- a/src/lib_c/x86_64-dragonfly/c/sys/socket.cr +++ b/src/lib_c/x86_64-dragonfly/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0x10000000 + SOCK_NONBLOCK = 0x20000000 alias SocklenT = UInt alias SaFamilyT = Char diff --git a/src/lib_c/x86_64-freebsd/c/sys/socket.cr b/src/lib_c/x86_64-freebsd/c/sys/socket.cr index 052b897af1a7..af8367acbbac 100644 --- a/src/lib_c/x86_64-freebsd/c/sys/socket.cr +++ b/src/lib_c/x86_64-freebsd/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0x10000000 + SOCK_NONBLOCK = 0x20000000 alias SocklenT = UInt alias SaFamilyT = Char diff --git a/src/lib_c/x86_64-linux-gnu/c/sys/socket.cr b/src/lib_c/x86_64-linux-gnu/c/sys/socket.cr index 7935dd8b3550..20bf6b0a3729 100644 --- a/src/lib_c/x86_64-linux-gnu/c/sys/socket.cr +++ b/src/lib_c/x86_64-linux-gnu/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 524288 + SOCK_NONBLOCK = 2048 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/x86_64-linux-musl/c/sys/socket.cr b/src/lib_c/x86_64-linux-musl/c/sys/socket.cr index 51211386e8bd..426d3ffcd556 100644 --- a/src/lib_c/x86_64-linux-musl/c/sys/socket.cr +++ b/src/lib_c/x86_64-linux-musl/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0o2000000 + SOCK_NONBLOCK = 0o0004000 alias SocklenT = UInt alias SaFamilyT = UShort diff --git a/src/lib_c/x86_64-netbsd/c/sys/socket.cr b/src/lib_c/x86_64-netbsd/c/sys/socket.cr index 3d196098492f..47389efc1953 100644 --- a/src/lib_c/x86_64-netbsd/c/sys/socket.cr +++ b/src/lib_c/x86_64-netbsd/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_WR = 1 SHUT_RDWR = 2 SOCK_CLOEXEC = 0x10000000 + SOCK_NONBLOCK = 0x20000000 alias SocklenT = UInt alias SaFamilyT = UInt8 diff --git a/src/lib_c/x86_64-openbsd/c/sys/socket.cr b/src/lib_c/x86_64-openbsd/c/sys/socket.cr index e812ddca2236..90ab24663d0e 100644 --- a/src/lib_c/x86_64-openbsd/c/sys/socket.cr +++ b/src/lib_c/x86_64-openbsd/c/sys/socket.cr @@ -27,6 +27,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0x8000 + SOCK_NONBLOCK = 0x4000 alias SocklenT = UInt alias SaFamilyT = Char diff --git a/src/lib_c/x86_64-solaris/c/sys/socket.cr b/src/lib_c/x86_64-solaris/c/sys/socket.cr index 0031c66d0da0..7db17283d926 100644 --- a/src/lib_c/x86_64-solaris/c/sys/socket.cr +++ b/src/lib_c/x86_64-solaris/c/sys/socket.cr @@ -28,6 +28,7 @@ lib LibC SHUT_RDWR = 2 SHUT_WR = 1 SOCK_CLOEXEC = 0x080000 + SOCK_NONBLOCK = 0x100000 alias SocklenT = UInt32 alias SaFamilyT = UInt16 diff --git a/src/socket.cr b/src/socket.cr index b862c30e2f9e..a9833dc255d4 100644 --- a/src/socket.cr +++ b/src/socket.cr @@ -45,40 +45,46 @@ class Socket < IO # Creates a TCP socket. Consider using `TCPSocket` or `TCPServer` unless you # need full control over the socket. - def self.tcp(family : Family, blocking = false) : self + def self.tcp(family : Family, blocking = nil) : self new(family, Type::STREAM, Protocol::TCP, blocking) end # Creates an UDP socket. Consider using `UDPSocket` unless you need full # control over the socket. - def self.udp(family : Family, blocking = false) + def self.udp(family : Family, blocking = nil) new(family, Type::DGRAM, Protocol::UDP, blocking) end # Creates an UNIX socket. Consider using `UNIXSocket` or `UNIXServer` unless # you need full control over the socket. - def self.unix(type : Type = Type::STREAM, blocking = false) : self + def self.unix(type : Type = Type::STREAM, blocking = nil) : self new(Family::UNIX, type, blocking: blocking) end - def initialize(family : Family, type : Type, protocol : Protocol = Protocol::IP, blocking = false) + def initialize(family : Family, type : Type, protocol : Protocol = Protocol::IP, blocking = nil) # This method is `#initialize` instead of `.new` because it is used as super # constructor from subclasses. - - fd = create_handle(family, type, protocol, blocking) - initialize(fd, family, type, protocol, blocking) + fd, blocking = Crystal::EventLoop.current.socket(family, type, protocol, blocking) + initialize(handle: fd, family: family, type: type, protocol: protocol, blocking: blocking) + self.sync = true end # Creates a Socket from an existing socket file descriptor / handle. - def initialize(fd, @family : Family, @type : Type, @protocol : Protocol = Protocol::IP, blocking = false) - @volatile_fd = Atomic.new(fd) - @closed = false - initialize_handle(fd) - + def initialize(fd, @family : Family, @type : Type, @protocol : Protocol = Protocol::IP, blocking = nil) + initialize(handle: fd, family: family, type: type, protocol: protocol) + blocking = Crystal::EventLoop.default_socket_blocking? if blocking.nil? + self.blocking = blocking unless blocking self.sync = true - unless blocking - self.blocking = false - end + end + + # :nodoc: + # + # Internal constructor to initialize the bare socket. The *blocking* arg is + # purely informational. + def initialize(*, handle, @family, @type, @protocol, blocking = nil) + @volatile_fd = Atomic.new(handle) + @closed = false + initialize_handle(handle, blocking) end # Connects the socket to a remote host:port. @@ -206,8 +212,13 @@ class Socket < IO # end # ``` def accept? : Socket? - if client_fd = system_accept - sock = Socket.new(client_fd, family, type, protocol, blocking) + if rs = Crystal::EventLoop.current.accept(self) + sock = Socket.new(handle: rs[0], family: family, type: type, protocol: protocol, blocking: rs[1]) + unless (blocking = self.blocking) == rs[1] + # FIXME: unlike the overloads in TCPServer and UNIXServer, this version + # carries the blocking mode from the server socket to the client socket + sock.blocking = blocking + end sock.sync = sync? sock end diff --git a/src/socket/tcp_server.cr b/src/socket/tcp_server.cr index c5cf3e1fcef0..287b5d394250 100644 --- a/src/socket/tcp_server.cr +++ b/src/socket/tcp_server.cr @@ -108,8 +108,8 @@ class TCPServer < TCPSocket # end # ``` def accept? : TCPSocket? - if client_fd = system_accept - sock = TCPSocket.new(fd: client_fd, family: family, type: type, protocol: protocol) + if rs = Crystal::EventLoop.current.accept(self) + sock = TCPSocket.new(handle: rs[0], family: family, type: type, protocol: protocol, blocking: rs[1]) sock.sync = sync? sock end diff --git a/src/socket/tcp_socket.cr b/src/socket/tcp_socket.cr index 4edcb3d08e5f..95c6521237cc 100644 --- a/src/socket/tcp_socket.cr +++ b/src/socket/tcp_socket.cr @@ -15,7 +15,7 @@ require "./ip_socket" # ``` class TCPSocket < IPSocket # Creates a new `TCPSocket`, waiting to be connected. - def self.new(family : Family = Family::INET, blocking = false) + def self.new(family : Family = Family::INET, blocking = nil) super(family, Type::STREAM, Protocol::TCP, blocking) end @@ -24,9 +24,7 @@ class TCPSocket < IPSocket # You may limit the DNS resolution time with `dns_timeout` and limit the # connection time to the remote server with `connect_timeout`. Both values # must be in seconds (integers or floats). - # - # NOTE: *dns_timeout* is currently only supported on Windows. - def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false) + def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = nil) Addrinfo.tcp(host, port, timeout: dns_timeout) do |addrinfo| super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking) connect(addrinfo, timeout: connect_timeout) do |error| @@ -36,16 +34,17 @@ class TCPSocket < IPSocket end end - protected def initialize(family : Family, type : Type, protocol : Protocol = Protocol::IP, blocking = false) - super family, type, protocol, blocking + protected def initialize(family : Family, type : Type, protocol : Protocol = Protocol::IP) + super family, type, protocol end - protected def initialize(fd : Handle, family : Family, type : Type, protocol : Protocol = Protocol::IP, blocking = false) - super fd, family, type, protocol, blocking + # constructor for TCPServer#accept? + protected def initialize(*, handle, family, type, protocol, blocking) + super(handle: handle, family: family, type: type, protocol: protocol, blocking: blocking) end # Creates a TCPSocket from an already configured raw file descriptor - def initialize(*, fd : Handle, family : Family = Family::INET, blocking = false) + def initialize(*, fd : Handle, family : Family = Family::INET, blocking = nil) super fd, family, Type::STREAM, Protocol::TCP, blocking end diff --git a/src/socket/unix_server.cr b/src/socket/unix_server.cr index e2f9b07b6157..19d09a43aed0 100644 --- a/src/socket/unix_server.cr +++ b/src/socket/unix_server.cr @@ -77,8 +77,8 @@ class UNIXServer < UNIXSocket # Returns the client socket or `nil` if the server is closed after invoking # this method. def accept? : UNIXSocket? - if client_fd = system_accept - sock = UNIXSocket.new(fd: client_fd, type: type, path: @path) + if rs = Crystal::EventLoop.current.accept(self) + sock = UNIXSocket.new(handle: rs[0], type: type, path: @path, blocking: rs[1]) sock.sync = sync? sock end diff --git a/src/socket/unix_socket.cr b/src/socket/unix_socket.cr index 914a2a62fd1d..fd864af4320d 100644 --- a/src/socket/unix_socket.cr +++ b/src/socket/unix_socket.cr @@ -32,6 +32,12 @@ class UNIXSocket < Socket super family, type, Protocol::IP end + # Constructor for #pair and UNIXServer#accept? + protected def initialize(*, handle : Handle, type : Type = Type::STREAM, path : Path | String? = nil, blocking : Bool = nil) + @path = path.to_s + super handle: handle, family: Family::UNIX, type: type, protocol: Protocol::IP, blocking: blocking + end + # Creates a UNIXSocket from an already configured raw file descriptor def initialize(*, fd : Handle, type : Type = Type::STREAM, path : Path | String? = nil) @path = path.to_s @@ -70,9 +76,12 @@ class UNIXSocket < Socket # left.gets # => "message" # ``` def self.pair(type : Type = Type::STREAM) : {UNIXSocket, UNIXSocket} - Crystal::System::Socket - .socketpair(type, Protocol::IP) - .map { |fd| UNIXSocket.new(fd: fd, type: type) } + fds, blocking = Crystal::EventLoop.current.socketpair(type, Protocol::IP) + fds.map do |fd| + sock = UNIXSocket.new(handle: fd, type: type, blocking: blocking) + sock.sync = true + sock + end end # Creates a pair of unnamed UNIX sockets (see `pair`) and yields them to the