Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
require "spec"

class Crystal::EventLoop::FakeLoop < Crystal::EventLoop::Polling
def self.default_blocking
false
end

getter operations = [] of {Symbol, Int32, Arena::Index | Bool}

private def system_run(blocking : Bool, & : Fiber ->) : Nil
Expand Down
100 changes: 27 additions & 73 deletions spec/std/file_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -48,85 +48,39 @@ describe "File" do
end

describe "blocking" do
it "opens regular file as blocking" do
with_tempfile("regular") do |path|
File.open(path, "w") do |file|
file.blocking.should be_true
end

File.open(path, "w", blocking: nil) do |file|
file.blocking.should be_true
end
end
end

it "opens regular file as non-blocking" do
with_tempfile("regular") do |path|
File.open(path, "w", blocking: false) do |file|
file.blocking.should be_false
end
end
end

{% if flag?(:unix) %}
if File.exists?("/dev/tty")
it "opens character device" do
File.open("/dev/tty", "r") do |file|
file.blocking.should be_true
{% if flag?(:unix) && LibC.has_method?(:mkfifo) %}
# interpreter doesn't support threads yet (#14287)
pending_interpreted "opens fifo file" do
path = File.tempname("chardev")
ret = LibC.mkfifo(path, File::DEFAULT_CREATE_PERMISSIONS)
raise RuntimeError.from_errno("mkfifo") unless ret == 0

msg = "a message to pass through the fifo"

# open(2) will block when opening a fifo file until another thread or
# process also opened the file in the opposite mode
{% if flag?(:preview_mt) %}
# the event loops should avoid blocking the current thread when
# *blocking* is false
spawn do
File.open(path, "w", blocking: false) { |w| w << msg }
end

File.open("/dev/tty", "r", blocking: false) do |file|
file.blocking.should be_false
{% else %}
# we must open the file in a thread to not block the fibers in the
# current thread
new_thread do
File.open(path, "w") { |w| w << msg }
end
{% end %}

File.open("/dev/tty", "r", blocking: nil) do |file|
file.blocking.should be_false
end
rescue File::Error
# The TTY may not be available (e.g. Docker CI)
# we can block the current thread because we (will) have a writer
File.open(path, "r", blocking: false) do |file|
file.gets_to_end.should eq(msg)
end
ensure
File.delete(path) if path && File.exists?(path)
end

{% if LibC.has_method?(:mkfifo) %}
# interpreter doesn't support threads yet (#14287)
pending_interpreted "opens fifo file as non-blocking" do
path = File.tempname("chardev")
ret = LibC.mkfifo(path, File::DEFAULT_CREATE_PERMISSIONS)
raise RuntimeError.from_errno("mkfifo") unless ret == 0

# FIXME: open(2) will block when opening a fifo file until another
# thread or process also opened the file; we should pass
# O_NONBLOCK to the open(2) call itself, not afterwards
file = nil
new_thread { file = File.new(path, "w", blocking: nil) }

begin
File.open(path, "r", blocking: false) do |file|
file.blocking.should be_false
end
ensure
File.delete(path)
file.try(&.close)
end
end
{% end %}
{% end %}

it "reads non-blocking file" do
File.open(datapath("test_file.txt"), "r", blocking: false) do |f|
f.gets_to_end.should eq("Hello World\n" * 20)
end
end

it "writes and reads large non-blocking file" do
with_tempfile("non-blocking-io.txt") do |path|
File.open(path, "w+", blocking: false) do |f|
f.puts "Hello World\n" * 40000
f.pos = 0
f.gets_to_end.should eq("Hello World\n" * 40000)
end
end
end
end

it "reads entire file" do
Expand Down
4 changes: 4 additions & 0 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ abstract class Crystal::EventLoop
{% end %}
end

def self.default_blocking : Bool
backend_class.default_blocking
end

# Creates an event loop instance
def self.create : self
backend_class.new
Expand Down
9 changes: 9 additions & 0 deletions src/crystal/event_loop/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
abstract class Crystal::EventLoop
module FileDescriptor
# Opens a file named *filename* from the disk.
#
# Blocks the current fiber until the file has been opened. Avoids blocking
# the current thread if possible, especially whebn *blocking* is `false` or
# `nil`.
#
# Returns the system file descriptor/handle or a system error.
abstract def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError

# Reads at least one byte from the file descriptor into *slice*.
#
# Blocks the current fiber if no data is available for reading, continuing
Expand Down
43 changes: 42 additions & 1 deletion src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
@timer_packet : LibC::HANDLE?
@timer_key : System::IOCP::CompletionKey?

def self.default_blocking : Bool
false
end

def initialize
@mutex = Thread::Mutex.new
@timers = Timers(Timer).new
Expand Down Expand Up @@ -216,6 +220,29 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
FiberEvent.new(:select_timeout, fiber)
end

def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | WinError
# we disregard the *blocking* arg because it's always `true` by default
# because of UNIX behavior, but we still want to read and write files async
access, disposition, attributes = System::File.posix_to_open_opts(flags, permissions, blocking: false)

handle = LibC.CreateFileW(
System.to_wstr(filename),
access,
LibC::DEFAULT_SHARE_MODE, # UNIX semantics
nil,
disposition,
attributes,
LibC::HANDLE.null
)

if handle == LibC::INVALID_HANDLE_VALUE
WinError.value
else
create_completion_port(handle)
handle.address
end
end

def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
System::IOCP.overlapped_operation(file_descriptor, "ReadFile", file_descriptor.read_timeout) do |overlapped|
ret = LibC.ReadFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped)
Expand All @@ -228,10 +255,24 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped|
bytes_written = System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped|
overlapped.offset = UInt64::MAX if file_descriptor.system_append?

ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end.to_i32

# The overlapped offset forced a write to the end of the file, but unlike
# synchronous writes, an asynchronous write incorrectly updates the file
# pointer: it merely adds the number of written bytes to the current
# position, disregarding that the offset might have changed it.
#
# We could seek before the async write (it works), but a concurrent fiber or
# parallel thread could also seek and we'd end up overwriting instead of
# appending; we need both the offset + explicit seek.
file_descriptor.system_seek(0, IO::Seek::End) if file_descriptor.system_append?

bytes_written
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
Expand Down
51 changes: 49 additions & 2 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ require "./libevent/event"

# :nodoc:
class Crystal::EventLoop::LibEvent < Crystal::EventLoop
def self.default_blocking : Bool
false
end

private getter(event_base) { Crystal::EventLoop::LibEvent::Event::Base.new }

def after_fork_before_exec : Nil
Expand Down Expand Up @@ -108,6 +112,46 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno
filename.check_no_null_byte

fd = 0
flags |= LibC::O_CLOEXEC

# We can't reliably detect without significant overhead whether *filename*
# might block for a while before calling open; while O_NONBLOCK has no
# effect on regular disk files, special file types are a different story.
# Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected
# reader) but it will always succeed for O_RDONLY (regardless of a connected
# writer or not), then any attempt to read will return EOF, leaving no means
# to wait until a writer connects.
#
# We thus rely on the *blocking* arg: when false the file might be a special
# file type, so we check it; if it's a fifo (named pipe) or a character
# device, we open in another thread so we don't risk blocking the current
# thread (and thus other fibers) until a reader or writer is also connected.
#
# We need preview_mt to safely re-enqueue the current fiber from the thread.
{% if flag?(:preview_mt) && !flag?(:interpreted) %}
if !blocking && System::File.special_file_type?(filename)
fd, errno = System::File.async_open(filename, flags, permissions)
return errno if fd == -1
else
fd = LibC.open(filename, flags, permissions)
return Errno.value if fd == -1
end
{% else %}
fd = LibC.open(filename, flags, permissions)
return Errno.value if fd == -1
{% end %}

# Always set O_NONBLOCK (it's a requirement).
status_flags = System::FileDescriptor.fcntl(fd, LibC::F_GETFL)
System::FileDescriptor.fcntl(fd, LibC::F_SETFL, status_flags | LibC::O_NONBLOCK)

fd
end

def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_read(file_descriptor, "Error reading file_descriptor") do
LibC.read(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand Down Expand Up @@ -217,7 +261,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
loop do
client_fd =
{% if LibC.has_method?(:accept4) %}
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC)
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK)
{% else %}
# we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we
# can't call `Crystal::System::Socket.lock_read` because the socket
Expand All @@ -228,7 +272,10 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
# could change the socket back to blocking mode between the condition
# check and the `accept` call.
fd = LibC.accept(socket.fd, nil, nil)
Crystal::System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1
unless fd == -1
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC)
System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK)
end
fd
{% end %}

Expand Down
52 changes: 49 additions & 3 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ end
# before suspending the fiber, then after resume it will raise
# `IO::TimeoutError` if the event timed out, and continue otherwise.
abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
def self.default_blocking : Bool
false
end

# The generational arena:
#
# 1. decorrelates the fd from the IO since the evloop only really cares about
Expand Down Expand Up @@ -145,6 +149,45 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop

# file descriptor interface, see Crystal::EventLoop::FileDescriptor

def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno
filename.check_no_null_byte

fd = 0
flags |= LibC::O_CLOEXEC

# We can't reliably detect without significant overhead whether *filename*
# might block for a while before calling open; while O_NONBLOCK has no
# effect on regular disk files, special file types are a different story.
# Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected
# reader) but it will always succeed for O_RDONLY (regardless of a connected
# writer or not), then any attempt to read will return EOF, leaving no means
# to wait until a writer connects.
#
# We thus rely on the *blocking* arg: when false the file might be a special
# file type, so we check it; if it's a fifo (named pipe) or a character
# device, we open in another thread so we don't risk blocking the current
# thread (and thus other fibers) until a reader or writer is also connected.
#
# We need preview_mt to safely re-enqueue the current fiber from the thread.
{% if flag?(:preview_mt) && !flag?(:interpreted) %}
if !blocking && System::File.special_file_type?(filename)
fd, errno = System::File.async_open(filename, flags, permissions)
return errno if fd == -1
else
fd = LibC.open(filename, flags, permissions)
return Errno.value if fd == -1
end
{% else %}
fd = LibC.open(filename, flags, permissions)
return Errno.value if fd == -1
{% end %}

# Always set O_NONBLOCK (it's a requirement).
System::FileDescriptor.fcntl(fd, LibC::F_SETFL, System::FileDescriptor.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK)

fd
end

def read(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32
size = evented_read(file_descriptor, slice, file_descriptor.@read_timeout)

Expand Down Expand Up @@ -227,7 +270,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
loop do
client_fd =
{% if LibC.has_method?(:accept4) %}
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC)
LibC.accept4(socket.fd, nil, nil, LibC::SOCK_CLOEXEC | LibC::SOCK_NONBLOCK)
{% else %}
# we may fail to set FD_CLOEXEC between `accept` and `fcntl` but we
# can't call `Crystal::System::Socket.lock_read` because the socket
Expand All @@ -237,9 +280,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
# we could lock when `socket.blocking?` is false, but another thread
# could change the socket back to blocking mode between the condition
# check and the `accept` call.
LibC.accept(socket.fd, nil, nil).tap do |fd|
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC) unless fd == -1
fd = LibC.accept(socket.fd, nil, nil)
unless fd == -1
System::Socket.fcntl(fd, LibC::F_SETFD, LibC::FD_CLOEXEC)
System::Socket.fcntl(fd, LibC::F_SETFL, System::Socket.fcntl(fd, LibC::F_GETFL) | LibC::O_NONBLOCK)
end
fd
{% end %}

return client_fd unless client_fd == -1
Expand Down
8 changes: 8 additions & 0 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# :nodoc:
class Crystal::EventLoop::Wasi < Crystal::EventLoop
def self.default_blocking
false
end

# Runs the event loop.
def run(blocking : Bool) : Bool
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run")
Expand Down Expand Up @@ -28,6 +32,10 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event")
end

def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError
raise NotImplementedError.new("Crystal::Wasi::EventLoop#open")
end

def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_read(file_descriptor, "Error reading file_descriptor") do
LibC.read(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand Down
Loading