Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions spec/std/file/tempfile_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ describe Crystal::System::File do
it "creates random file name" do
with_tempfile "random-path" do |tempdir|
Dir.mkdir tempdir
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
path.should eq Path[tempdir, "A789abcdeZ"].to_s
ensure
IO::FileDescriptor.new(fd).close if fd
Expand All @@ -209,7 +209,7 @@ describe Crystal::System::File do
Dir.mkdir tempdir
existing_path = Path[tempdir, "A789abcdeZ"]
File.touch existing_path
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]))
fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]))
path.should eq File.join(tempdir, "AfghijklmZ")
ensure
IO::FileDescriptor.new(fd).close if fd
Expand All @@ -221,7 +221,7 @@ describe Crystal::System::File do
Dir.mkdir tempdir
File.touch Path[tempdir, "A789abcdeZ"]
expect_raises(File::AlreadyExistsError, "Error creating temporary file") do
fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14]))
ensure
IO::FileDescriptor.new(fd).close if fd
end
Expand Down
22 changes: 14 additions & 8 deletions spec/std/file_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,25 @@ describe "File" do
ret = LibC.mkfifo(path, File::DEFAULT_CREATE_PERMISSIONS)
raise RuntimeError.from_errno("mkfifo") unless ret == 0

# FIXME: open(2) will block when opening a fifo file until another
# thread or process also opened the file; we should pass
# O_NONBLOCK to the open(2) call itself, not afterwards
file = nil
new_thread { file = File.new(path, "w", blocking: nil) }
# open(2) blocks when opening a pipe/fifo or chardev file until
# another thread or process also opened the file in the opposite mode
w = nil
{% if flag?(:preview_mt) %}
# the evloop handles opening the fifo file without blocking the
# current thread (only the current fiber)
spawn { w = File.open(path, "w", blocking: nil) }
{% else %}
# we must explicitly spawn a thread
new_thread { w = File.new(path, "w") }
{% end %}

begin
File.open(path, "r", blocking: false) do |file|
file.blocking.should be_false
File.open(path, "r", blocking: false) do |r|
r.blocking.should be_false
end
ensure
File.delete(path)
file.try(&.close)
w.try(&.close)
end
end
{% end %}
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/event_loop/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ abstract class Crystal::EventLoop
# `nil`.
#
# Returns the system file descriptor or handle, or a system error.
abstract def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError
abstract def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno | WinError

# Reads at least one byte from the file descriptor into *slice*.
#
Expand Down
5 changes: 3 additions & 2 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
FiberEvent.new(:select_timeout, fiber)
end

def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | WinError
def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | WinError
access, disposition, attributes = System::File.posix_to_open_opts(flags, permissions, blocking)

handle = LibC.CreateFileW(
Expand All @@ -239,7 +239,8 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
if handle == LibC::INVALID_HANDLE_VALUE
WinError.value
else
handle.address
create_completion_port(handle) unless blocking
{handle.address, !!blocking}
end
end

Expand Down
44 changes: 37 additions & 7 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,46 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno
def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno
path.check_no_null_byte

fd = LibC.open(path, flags | LibC::O_CLOEXEC, permissions)

if fd == -1
Errno.value
else
fd
fd = 0
flags |= LibC::O_CLOEXEC
blocking = !System::File.special_type?(path) if blocking.nil?

# We can't reliably detect without significant overhead whether the file at
# *path* might block for a while before calling open; while O_NONBLOCK has
# no effect on regular disk files, special file types are a different story.
# Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected
# reader) but it will always succeed for O_RDONLY (regardless of a connected
# writer or not), then any attempt to read will return EOF, leaving no means
# to wait until a writer connects.
#
# We thus rely on the *blocking* arg: when false the file might be a special
# file type, so we check it; if it's a fifo (named pipe) or a character
# device, we open in another thread so we don't risk blocking the current
# thread (and thus other fibers) until a reader or writer is also connected.
#
# We need preview_mt to safely re-enqueue the current fiber from the thread.
{% if flag?(:preview_mt) && !flag?(:interpreted) %}
if blocking
fd = LibC.open(path, flags, permissions)
return Errno.value if fd == -1
else
fd, errno = System::File.async_open(path, flags, permissions)
return errno if fd == -1
end
{% else %}
fd = LibC.open(path, flags, permissions)
return Errno.value if fd == -1
{% end %}

unless blocking
status_flags = System::FileDescriptor.fcntl(fd, LibC::F_GETFL)
System::FileDescriptor.fcntl(fd, LibC::F_SETFL, status_flags | LibC::O_NONBLOCK)
end

{fd, blocking}
end

def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
Expand Down
44 changes: 37 additions & 7 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,46 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop

# file descriptor interface, see Crystal::EventLoop::FileDescriptor

def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno
def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno
path.check_no_null_byte

fd = LibC.open(path, flags | LibC::O_CLOEXEC, permissions)

if fd == -1
Errno.value
else
fd
fd = 0
flags |= LibC::O_CLOEXEC
blocking = !System::File.special_type?(path) if blocking.nil?

# We can't reliably detect without significant overhead whether the file at
# *path* might block for a while before calling open; while O_NONBLOCK has
# no effect on regular disk files, special file types are a different story.
# Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected
# reader) but it will always succeed for O_RDONLY (regardless of a connected
# writer or not), then any attempt to read will return EOF, leaving no means
# to wait until a writer connects.
#
# We thus rely on the *blocking* arg: when false the file might be a special
# file type, so we check it; if it's a fifo (named pipe) or a character
# device, we open in another thread so we don't risk blocking the current
# thread (and thus other fibers) until a reader or writer is also connected.
#
# We need preview_mt to safely re-enqueue the current fiber from the thread.
{% if flag?(:preview_mt) && !flag?(:interpreted) %}
if blocking
fd = LibC.open(path, flags, permissions)
return Errno.value if fd == -1
else
fd, errno = System::File.async_open(path, flags, permissions)
return errno if fd == -1
end
{% else %}
fd = LibC.open(path, flags, permissions)
return Errno.value if fd == -1
{% end %}

unless blocking
status_flags = System::FileDescriptor.fcntl(fd, LibC::F_GETFL)
System::FileDescriptor.fcntl(fd, LibC::F_SETFL, status_flags | LibC::O_NONBLOCK)
end

{fd, blocking}
end

def read(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event")
end

def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError
def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno | WinError
raise NotImplementedError.new("Crystal::Wasi::EventLoop#open")
end

Expand Down
7 changes: 4 additions & 3 deletions src/crystal/system/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module Crystal::System::File

LOWER_ALPHANUM = "0123456789abcdefghijklmnopqrstuvwxyz".to_slice

def self.mktemp(prefix : String?, suffix : String?, dir : String, random : ::Random = ::Random::DEFAULT) : {FileDescriptor::Handle, String}
def self.mktemp(prefix : String?, suffix : String?, dir : String, random : ::Random = ::Random::DEFAULT) : {FileDescriptor::Handle, String, Bool}
flags = LibC::O_RDWR | LibC::O_CREAT | LibC::O_EXCL
perm = ::File::Permissions.new(0o600)

Expand All @@ -68,8 +68,9 @@ module Crystal::System::File
end

case result = EventLoop.current.open(path, flags, perm, blocking: true)
when FileDescriptor::Handle
return {result, path}
when Tuple(FileDescriptor::Handle, Bool)
fd, blocking = result
return {fd, path, blocking}
when Errno::EEXIST, WinError::ERROR_FILE_EXISTS
# retry
else
Expand Down
37 changes: 34 additions & 3 deletions src/crystal/system/unix/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,49 @@ require "file/error"

# :nodoc:
module Crystal::System::File
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : FileDescriptor::Handle
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : {FileDescriptor::Handle, Bool}
perm = ::File::Permissions.new(perm) if perm.is_a? Int32

case result = EventLoop.current.open(filename, open_flag(mode), perm, blocking)
in FileDescriptor::Handle
in Tuple(FileDescriptor::Handle, Bool)
result
in Errno
raise ::File::Error.from_os_error("Error opening file with mode '#{mode}'", result, file: filename)
end
end

protected def system_set_mode(mode : String)
{% if flag?(:preview_mt) && !flag?(:interpreted) %}
protected def self.async_open(filename, flags, permissions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polish: "async" seems a big ambiguous in the context of the event loop. The method name could express better that this is about parallel open in a different thread. Maybe .open_threaded?

fd = -1
errno = Errno::NONE
fiber = ::Fiber.current

::Thread.new do
fd = LibC.open(filename, flags, permissions)
errno = Errno.value if fd == -1

{% if flag?(:execution_context) %}
fiber.enqueue
{% else %}
# HACK: avoid Fiber#enqueue because it would lazily create a scheduler
# for the thread just to send the fiber to another scheduler
fiber.get_current_thread.not_nil!.scheduler.send_fiber(fiber)
{% end %}
end
::Fiber.suspend

{fd, errno}
end
{% end %}

protected def system_init(mode : String, blocking : Bool) : Nil
end

def self.special_type?(path : String) : Bool
path.check_no_null_byte
stat = uninitialized LibC::Stat
ret = stat(path, pointerof(stat))
ret != -1 && (stat.st_mode & LibC::S_IFMT).in?(LibC::S_IFCHR, LibC::S_IFIFO)
end

def self.info?(path : String, follow_symlinks : Bool) : ::File::Info?
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/unix/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module Crystal::System::FileDescriptor
fcntl(LibC::F_SETFL, new_flags) unless new_flags == current_flags
end

private def system_blocking_init(blocking : Bool?)
protected def system_blocking_init(blocking : Bool?)
if blocking.nil?
blocking =
case system_info.type
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/wasi/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ require "../unix/file"

# :nodoc:
module Crystal::System::File
protected def system_set_mode(mode : String)
protected def system_init(mode : String, blocking : Bool) : Nil
end

def self.chmod(path, mode)
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/wasi/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Crystal::System::FileDescriptor
r
end

private def system_blocking_init(blocking : Bool?)
protected def system_blocking_init(blocking : Bool?)
end

private def system_reopen(other : IO::FileDescriptor)
Expand Down
7 changes: 4 additions & 3 deletions src/crystal/system/win32/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Crystal::System::File
# write at the end of the file.
getter? system_append = false

def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : FileDescriptor::Handle
def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : {FileDescriptor::Handle, Bool}
perm = ::File::Permissions.new(perm) if perm.is_a? Int32
# Only the owner writable bit is used, since windows only supports
# the read only attribute.
Expand All @@ -25,7 +25,7 @@ module Crystal::System::File
end

case result = EventLoop.current.open(filename, open_flag(mode), ::File::Permissions.new(perm), blocking != false)
in FileDescriptor::Handle
in Tuple(FileDescriptor::Handle, Bool)
result
in WinError
raise ::File::Error.from_os_error("Error opening file with mode '#{mode}'", result, file: filename)
Expand Down Expand Up @@ -88,8 +88,9 @@ module Crystal::System::File
{access, disposition, attributes}
end

protected def system_set_mode(mode : String)
protected def system_init(mode : String, blocking : Bool) : Nil
@system_append = true if mode.starts_with?('a')
@system_blocking = blocking
end

private def write_blocking(handle, slice)
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ module Crystal::System::FileDescriptor
end
end

def system_blocking_init(blocking : Bool?)
protected def system_blocking_init(blocking : Bool?)
if blocking.nil?
# there are no official API to know whether a handle has been opened with
# the OVERLAPPED flag, but the following call is supposed to leak the
Expand Down
7 changes: 6 additions & 1 deletion src/fiber/execution_context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ module Fiber::ExecutionContext
Thread.current.execution_context
end

def self.current? : ExecutionContext?
Thread.current.execution_context?
end

# :nodoc:
#
# Tells the current scheduler to suspend the current fiber and resume the
Expand Down Expand Up @@ -181,6 +185,7 @@ module Fiber::ExecutionContext
#
# Enqueues a fiber to be resumed inside the execution context.
#
# May be called from any ExecutionContext (i.e. must be thread-safe).
# May be called from any ExecutionContext (i.e. must be thread-safe). May also
# be called from bare threads (outside of an ExecutionContext).
abstract def enqueue(fiber : Fiber) : Nil
end
2 changes: 1 addition & 1 deletion src/fiber/execution_context/multi_threaded.cr
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ module Fiber::ExecutionContext

# :nodoc:
def enqueue(fiber : Fiber) : Nil
if ExecutionContext.current == self
if ExecutionContext.current? == self
# local enqueue: push to local queue of current scheduler
ExecutionContext::Scheduler.current.enqueue(fiber)
else
Expand Down
2 changes: 1 addition & 1 deletion src/fiber/execution_context/single_threaded.cr
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ module Fiber::ExecutionContext

# :nodoc:
def enqueue(fiber : Fiber) : Nil
if ExecutionContext.current == self
if ExecutionContext.current? == self
# local enqueue
Crystal.trace :sched, "enqueue", fiber: fiber
@runnables.push(fiber)
Expand Down
Loading