Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
require "spec"

class Crystal::EventLoop::FakeLoop < Crystal::EventLoop::Polling
def self.default_socket_blocking?
false
end

getter operations = [] of {Symbol, Int32, Arena::Index | Bool}

private def system_run(blocking : Bool, & : Fiber ->) : Nil
Expand Down
4 changes: 4 additions & 0 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ abstract class Crystal::EventLoop
backend_class.new
end

def self.default_socket_blocking? : Bool
backend_class.default_socket_blocking?
end

@[AlwaysInline]
def self.current : self
{% if flag?(:execution_context) %}
Expand Down
17 changes: 16 additions & 1 deletion src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ require "./iocp/*"

# :nodoc:
class Crystal::EventLoop::IOCP < Crystal::EventLoop
def self.default_socket_blocking?
true
end

@waitable_timer : System::WaitableTimer?
@timer_packet : LibC::HANDLE?
@timer_key : System::IOCP::CompletionKey?
Expand Down Expand Up @@ -289,6 +293,17 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
file_descriptor.file_descriptor_close
end

def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool}
blocking = true if blocking.nil?
fd = System::Socket.socket(family, type, protocol, blocking)
create_completion_port LibC::HANDLE.new(fd)
{fd, blocking}
end

def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool)
raise NotImplementedError.new("Crystal::EventLoop::IOCP#socketpair")
end

private def wsa_buffer(bytes)
wsabuf = LibC::WSABUF.new
wsabuf.len = bytes.size
Expand Down Expand Up @@ -376,7 +391,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
def accept(socket : ::Socket) : {::Socket::Handle, Bool}?
socket.system_accept do |client_handle|
address_size = sizeof(LibC::SOCKADDR_STORAGE) + 16

Expand Down
25 changes: 21 additions & 4 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ require "./libevent/event"

# :nodoc:
class Crystal::EventLoop::LibEvent < Crystal::EventLoop
def self.default_socket_blocking?
false
end

private getter(event_base) { Crystal::EventLoop::LibEvent::Event::Base.new }

def after_fork_before_exec : Nil
Expand Down Expand Up @@ -166,6 +170,16 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
file_descriptor.file_descriptor_close
end

def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool}
socket = System::Socket.socket(family, type, protocol, !!blocking)
{socket, !!blocking}
end

def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool)
socket = System::Socket.socketpair(type, protocol, blocking: false)
{socket, false}
end

def read(socket : ::Socket, slice : Bytes) : Int32
evented_read(socket, "Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
Expand Down Expand Up @@ -231,11 +245,11 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
def accept(socket : ::Socket) : {::Socket::Handle, Bool}?
loop do
client_fd =
{% if LibC.has_method?(:accept4) %}
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC)
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK)
{% else %}
# we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we
# can't call `Crystal::System::Socket.lock_read` because the socket
Expand All @@ -246,7 +260,10 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
# could change the socket back to blocking mode between the condition
# check and the `accept` call.
fd = LibC.accept(socket.fd, nil, nil)
Crystal::System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1
unless fd == -1
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC)
System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK)
end
fd
{% end %}

Expand All @@ -262,7 +279,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
raise ::Socket::Error.from_errno("accept")
end
else
return client_fd
return {client_fd, false}
end
end
end
Expand Down
25 changes: 21 additions & 4 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ end
# before suspending the fiber, then after resume it will raise
# `IO::TimeoutError` if the event timed out, and continue otherwise.
abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
def self.default_socket_blocking?
false
end

# The generational arena:
#
# 1. decorrelates the fd from the IO since the evloop only really cares about
Expand Down Expand Up @@ -226,6 +230,16 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop

# socket interface, see Crystal::EventLoop::Socket

def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool}
socket = System::Socket.socket(family, type, protocol, !!blocking)
{socket, !!blocking}
end

def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool)
socket = System::Socket.socketpair(type, protocol, blocking: false)
{socket, false}
end

def read(socket : ::Socket, slice : Bytes) : Int32
size = evented_read(socket, slice, socket.@read_timeout)
raise IO::Error.from_errno("read", target: socket) if size == -1
Expand All @@ -250,11 +264,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
def accept(socket : ::Socket) : {::Socket::Handle, Bool}?
loop do
client_fd =
{% if LibC.has_method?(:accept4) %}
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC)
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK)
{% else %}
# we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we
# can't call `Crystal::System::Socket.lock_read` because the socket
Expand All @@ -265,11 +279,14 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
# could change the socket back to blocking mode between the condition
# check and the `accept` call.
LibC.accept(socket.fd, nil, nil).tap do |fd|
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1
unless fd == -1
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC)
System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK)
end
end
{% end %}

return client_fd unless client_fd == -1
return {client_fd, false} unless client_fd == -1
return if socket.closed?

if Errno.value == Errno::EAGAIN
Expand Down
10 changes: 9 additions & 1 deletion src/crystal/event_loop/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

abstract class Crystal::EventLoop
module Socket
# Creates a new socket file descriptor or handle and returns it, along with
# whether the blocking flag has been set.
abstract def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool?) : {::Socket::Handle, Bool}

# Creates a pair of UNIX socket file descriptors or handles and returns
# them, along with whether the blocking mode has been set.
abstract def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : Tuple({::Socket::Handle, ::Socket::Handle}, Bool)

# Reads at least one byte from the socket into *slice*.
#
# Blocks the current fiber if no data is available for reading, continuing
Expand Down Expand Up @@ -37,7 +45,7 @@ abstract class Crystal::EventLoop
# becomes available. Otherwise returns immediately.
#
# Returns a handle to the socket for the new connection.
abstract def accept(socket : ::Socket) : ::Socket::Handle?
abstract def accept(socket : ::Socket) : {::Socket::Handle, Bool}?

# Opens a connection on *socket* to the target *address*.
#
Expand Down
12 changes: 12 additions & 0 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# :nodoc:
class Crystal::EventLoop::Wasi < Crystal::EventLoop
def self.default_socket_blocking?
false
end

# Runs the event loop.
def run(blocking : Bool) : Bool
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run")
Expand Down Expand Up @@ -73,6 +77,14 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
file_descriptor.file_descriptor_close
end

def socket(family : ::Socket::Family, type : ::Socket::Type, protocol : ::Socket::Protocol) : {::Socket::Handle, Bool}
raise NotImplementedError.new("Crystal::EventLoop::Wasi#socket")
end

def socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle}
raise NotImplementedError.new("Crystal::EventLoop::Wasi#socketpair")
end

def read(socket : ::Socket, slice : Bytes) : Int32
evented_read(socket, "Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
Expand Down
9 changes: 1 addition & 8 deletions src/crystal/system/socket.cr
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
require "../event_loop/socket"

module Crystal::System::Socket
# Creates a file descriptor / socket handle
# private def create_handle(family, type, protocol, blocking) : Handle

# Initializes a file descriptor / socket handle for use with Crystal Socket
# private def initialize_handle(fd)

Expand All @@ -17,10 +14,6 @@ module Crystal::System::Socket

# private def system_listen(backlog)

private def system_accept
event_loop.accept(self)
end

private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
event_loop.send_to(self, bytes, addr)
end
Expand Down Expand Up @@ -79,7 +72,7 @@ module Crystal::System::Socket

# def self.fcntl(fd, cmd, arg = 0)

# def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle}
# def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle}

private def system_read(slice : Bytes) : Int32
event_loop.read(self, slice)
Expand Down
19 changes: 14 additions & 5 deletions src/crystal/system/unix/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@ module Crystal::System::Socket

alias Handle = Int32

private def create_handle(family, type, protocol, blocking) : Handle
def self.socket(family, type, protocol, blocking) : Handle
{% if LibC.has_constant?(:SOCK_CLOEXEC) %}
fd = LibC.socket(family, type.value | LibC::SOCK_CLOEXEC, protocol)
flags = type.value | LibC::SOCK_CLOEXEC
flags |= LibC::SOCK_NONBLOCK unless blocking
fd = LibC.socket(family, flags, protocol)
raise ::Socket::Error.from_errno("Failed to create socket") if fd == -1
fd
{% else %}
Process.lock_read do
fd = LibC.socket(family, type, protocol)
raise ::Socket::Error.from_errno("Failed to create socket") if fd == -1
Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC)
Socket.fcntl(fd, LibC::F_SETFL, Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK) unless blocking
fd
end
{% end %}
end

private def initialize_handle(fd)
private def initialize_handle(fd, blocking = nil)
{% if Crystal::EventLoop.has_constant?(:Polling) %}
@__evloop_data = Crystal::EventLoop::Polling::Arena::INVALID_INDEX
{% end %}
Expand Down Expand Up @@ -182,11 +185,13 @@ module Crystal::System::Socket
r
end

def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle}
def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle}
fds = uninitialized Handle[2]

{% if LibC.has_constant?(:SOCK_CLOEXEC) %}
if LibC.socketpair(::Socket::Family::UNIX, type.value | LibC::SOCK_CLOEXEC, protocol, fds) == -1
flags = type.value | LibC::SOCK_CLOEXEC
flags |= LibC::SOCK_NONBLOCK unless blocking
if LibC.socketpair(::Socket::Family::UNIX, flags, protocol, fds) == -1
raise ::Socket::Error.new("socketpair() failed")
end
{% else %}
Expand All @@ -196,6 +201,10 @@ module Crystal::System::Socket
end
fcntl(fds[0], LibC::F_SETFD, LibC::FD_CLOEXEC)
fcntl(fds[1], LibC::F_SETFD, LibC::FD_CLOEXEC)
unless blocking
fcntl(fds[0], LibC::F_SETFL, fcntl(fds[0], LibC::F_GETFL) | LibC::O_NONBLOCK)
fcntl(fds[1], LibC::F_SETFL, fcntl(fds[0], LibC::F_GETFL) | LibC::O_NONBLOCK)
end
end
{% end %}

Expand Down
10 changes: 1 addition & 9 deletions src/crystal/system/wasi/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ module Crystal::System::Socket

alias Handle = Int32

private def create_handle(family, type, protocol, blocking) : Handle
raise NotImplementedError.new "Crystal::System::Socket#create_handle"
end

private def initialize_handle(fd)
private def initialize_handle(fd, blocking = nil)
end

# Tries to bind the socket to a local address.
Expand Down Expand Up @@ -135,10 +131,6 @@ module Crystal::System::Socket
r
end

def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol) : {Handle, Handle}
raise NotImplementedError.new("Crystal::System::Socket.socketpair")
end

private def system_tty?
LibC.isatty(fd) == 1
end
Expand Down
Loading
Loading