diff --git a/spec/std/file/tempfile_spec.cr b/spec/std/file/tempfile_spec.cr index 84d9cd553398..c6721cf342d8 100644 --- a/spec/std/file/tempfile_spec.cr +++ b/spec/std/file/tempfile_spec.cr @@ -197,7 +197,7 @@ describe Crystal::System::File do it "creates random file name" do with_tempfile "random-path" do |tempdir| Dir.mkdir tempdir - fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14])) + fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14])) path.should eq Path[tempdir, "A789abcdeZ"].to_s ensure IO::FileDescriptor.new(fd).close if fd @@ -209,7 +209,7 @@ describe Crystal::System::File do Dir.mkdir tempdir existing_path = Path[tempdir, "A789abcdeZ"] File.touch existing_path - fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22])) + fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22])) path.should eq File.join(tempdir, "AfghijklmZ") ensure IO::FileDescriptor.new(fd).close if fd @@ -221,7 +221,7 @@ describe Crystal::System::File do Dir.mkdir tempdir File.touch Path[tempdir, "A789abcdeZ"] expect_raises(File::AlreadyExistsError, "Error creating temporary file") do - fd, path = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14])) + fd, path, _ = Crystal::System::File.mktemp("A", "Z", dir: tempdir, random: TestRNG.new([7, 8, 9, 10, 11, 12, 13, 14])) ensure IO::FileDescriptor.new(fd).close if fd end diff --git a/spec/std/file_spec.cr b/spec/std/file_spec.cr index 9b3b7bfce6d3..e60597429c08 100644 --- a/spec/std/file_spec.cr +++ b/spec/std/file_spec.cr @@ -94,19 +94,25 @@ describe "File" do ret = LibC.mkfifo(path, File::DEFAULT_CREATE_PERMISSIONS) raise RuntimeError.from_errno("mkfifo") unless ret == 0 - # FIXME: open(2) will block when opening a fifo file until another - # thread or process also opened the file; we should pass - # O_NONBLOCK to the open(2) call itself, not afterwards - file = nil - new_thread { file = File.new(path, "w", blocking: nil) } + # open(2) blocks when opening a pipe/fifo or chardev file until + # another thread or process also opened the file in the opposite mode + w = nil + {% if flag?(:preview_mt) %} + # the evloop handles opening the fifo file without blocking the + # current thread (only the current fiber) + spawn { w = File.open(path, "w", blocking: nil) } + {% else %} + # we must explicitly spawn a thread + new_thread { w = File.new(path, "w") } + {% end %} begin - File.open(path, "r", blocking: false) do |file| - file.blocking.should be_false + File.open(path, "r", blocking: false) do |r| + r.blocking.should be_false end ensure File.delete(path) - file.try(&.close) + w.try(&.close) end end {% end %} diff --git a/src/crystal/event_loop/file_descriptor.cr b/src/crystal/event_loop/file_descriptor.cr index 8b6d15f219da..61a198892cda 100644 --- a/src/crystal/event_loop/file_descriptor.cr +++ b/src/crystal/event_loop/file_descriptor.cr @@ -7,7 +7,7 @@ abstract class Crystal::EventLoop # `nil`. # # Returns the system file descriptor or handle, or a system error. - abstract def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError + abstract def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno | WinError # Reads at least one byte from the file descriptor into *slice*. # diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index c37b26424827..7ae53a867590 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -223,7 +223,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop FiberEvent.new(:select_timeout, fiber) end - def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | WinError + def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | WinError access, disposition, attributes = System::File.posix_to_open_opts(flags, permissions, blocking) handle = LibC.CreateFileW( @@ -239,7 +239,8 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop if handle == LibC::INVALID_HANDLE_VALUE WinError.value else - handle.address + create_completion_port(handle) unless blocking + {handle.address, !!blocking} end end diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index d207961fa2d3..fbedc7cdb765 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -108,16 +108,46 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end - def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno + def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno path.check_no_null_byte - fd = LibC.open(path, flags | LibC::O_CLOEXEC, permissions) - - if fd == -1 - Errno.value - else - fd + fd = 0 + flags |= LibC::O_CLOEXEC + blocking = !System::File.special_type?(path) if blocking.nil? + + # We can't reliably detect without significant overhead whether the file at + # *path* might block for a while before calling open; while O_NONBLOCK has + # no effect on regular disk files, special file types are a different story. + # Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected + # reader) but it will always succeed for O_RDONLY (regardless of a connected + # writer or not), then any attempt to read will return EOF, leaving no means + # to wait until a writer connects. + # + # We thus rely on the *blocking* arg: when false the file might be a special + # file type, so we check it; if it's a fifo (named pipe) or a character + # device, we open in another thread so we don't risk blocking the current + # thread (and thus other fibers) until a reader or writer is also connected. + # + # We need preview_mt to safely re-enqueue the current fiber from the thread. + {% if flag?(:preview_mt) && !flag?(:interpreted) %} + if blocking + fd = LibC.open(path, flags, permissions) + return Errno.value if fd == -1 + else + fd, errno = System::File.async_open(path, flags, permissions) + return errno if fd == -1 + end + {% else %} + fd = LibC.open(path, flags, permissions) + return Errno.value if fd == -1 + {% end %} + + unless blocking + status_flags = System::FileDescriptor.fcntl(fd, LibC::F_GETFL) + System::FileDescriptor.fcntl(fd, LibC::F_SETFL, status_flags | LibC::O_NONBLOCK) end + + {fd, blocking} end def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 4391e84f9a90..9bc098a0c672 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -154,16 +154,46 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # file descriptor interface, see Crystal::EventLoop::FileDescriptor - def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno + def open(path : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno path.check_no_null_byte - fd = LibC.open(path, flags | LibC::O_CLOEXEC, permissions) - - if fd == -1 - Errno.value - else - fd + fd = 0 + flags |= LibC::O_CLOEXEC + blocking = !System::File.special_type?(path) if blocking.nil? + + # We can't reliably detect without significant overhead whether the file at + # *path* might block for a while before calling open; while O_NONBLOCK has + # no effect on regular disk files, special file types are a different story. + # Open with O_NONBLOCK will fail with ENXIO for O_WRONLY (no connected + # reader) but it will always succeed for O_RDONLY (regardless of a connected + # writer or not), then any attempt to read will return EOF, leaving no means + # to wait until a writer connects. + # + # We thus rely on the *blocking* arg: when false the file might be a special + # file type, so we check it; if it's a fifo (named pipe) or a character + # device, we open in another thread so we don't risk blocking the current + # thread (and thus other fibers) until a reader or writer is also connected. + # + # We need preview_mt to safely re-enqueue the current fiber from the thread. + {% if flag?(:preview_mt) && !flag?(:interpreted) %} + if blocking + fd = LibC.open(path, flags, permissions) + return Errno.value if fd == -1 + else + fd, errno = System::File.async_open(path, flags, permissions) + return errno if fd == -1 + end + {% else %} + fd = LibC.open(path, flags, permissions) + return Errno.value if fd == -1 + {% end %} + + unless blocking + status_flags = System::FileDescriptor.fcntl(fd, LibC::F_GETFL) + System::FileDescriptor.fcntl(fd, LibC::F_SETFL, status_flags | LibC::O_NONBLOCK) end + + {fd, blocking} end def read(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32 diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index fd77fa91ab45..60ba11d738b0 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -28,7 +28,7 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event") end - def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : System::FileDescriptor::Handle | Errno | WinError + def open(filename : String, flags : Int32, permissions : File::Permissions, blocking : Bool?) : {System::FileDescriptor::Handle, Bool} | Errno | WinError raise NotImplementedError.new("Crystal::Wasi::EventLoop#open") end diff --git a/src/crystal/system/file.cr b/src/crystal/system/file.cr index cd25ad4c8777..93c88e09b3f4 100644 --- a/src/crystal/system/file.cr +++ b/src/crystal/system/file.cr @@ -51,7 +51,7 @@ module Crystal::System::File LOWER_ALPHANUM = "0123456789abcdefghijklmnopqrstuvwxyz".to_slice - def self.mktemp(prefix : String?, suffix : String?, dir : String, random : ::Random = ::Random::DEFAULT) : {FileDescriptor::Handle, String} + def self.mktemp(prefix : String?, suffix : String?, dir : String, random : ::Random = ::Random::DEFAULT) : {FileDescriptor::Handle, String, Bool} flags = LibC::O_RDWR | LibC::O_CREAT | LibC::O_EXCL perm = ::File::Permissions.new(0o600) @@ -68,8 +68,9 @@ module Crystal::System::File end case result = EventLoop.current.open(path, flags, perm, blocking: true) - when FileDescriptor::Handle - return {result, path} + when Tuple(FileDescriptor::Handle, Bool) + fd, blocking = result + return {fd, path, blocking} when Errno::EEXIST, WinError::ERROR_FILE_EXISTS # retry else diff --git a/src/crystal/system/unix/file.cr b/src/crystal/system/unix/file.cr index a029705145c3..a59e70054245 100644 --- a/src/crystal/system/unix/file.cr +++ b/src/crystal/system/unix/file.cr @@ -3,18 +3,49 @@ require "file/error" # :nodoc: module Crystal::System::File - def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : FileDescriptor::Handle + def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : {FileDescriptor::Handle, Bool} perm = ::File::Permissions.new(perm) if perm.is_a? Int32 case result = EventLoop.current.open(filename, open_flag(mode), perm, blocking) - in FileDescriptor::Handle + in Tuple(FileDescriptor::Handle, Bool) result in Errno raise ::File::Error.from_os_error("Error opening file with mode '#{mode}'", result, file: filename) end end - protected def system_set_mode(mode : String) + {% if flag?(:preview_mt) && !flag?(:interpreted) %} + protected def self.async_open(filename, flags, permissions) + fd = -1 + errno = Errno::NONE + fiber = ::Fiber.current + + ::Thread.new do + fd = LibC.open(filename, flags, permissions) + errno = Errno.value if fd == -1 + + {% if flag?(:execution_context) %} + fiber.enqueue + {% else %} + # HACK: avoid Fiber#enqueue because it would lazily create a scheduler + # for the thread just to send the fiber to another scheduler + fiber.get_current_thread.not_nil!.scheduler.send_fiber(fiber) + {% end %} + end + ::Fiber.suspend + + {fd, errno} + end + {% end %} + + protected def system_init(mode : String, blocking : Bool) : Nil + end + + def self.special_type?(path : String) : Bool + path.check_no_null_byte + stat = uninitialized LibC::Stat + ret = stat(path, pointerof(stat)) + ret != -1 && (stat.st_mode & LibC::S_IFMT).in?(LibC::S_IFCHR, LibC::S_IFIFO) end def self.info?(path : String, follow_symlinks : Bool) : ::File::Info? diff --git a/src/crystal/system/unix/file_descriptor.cr b/src/crystal/system/unix/file_descriptor.cr index 2c360b9d1550..9d1ddb051774 100644 --- a/src/crystal/system/unix/file_descriptor.cr +++ b/src/crystal/system/unix/file_descriptor.cr @@ -34,7 +34,7 @@ module Crystal::System::FileDescriptor fcntl(LibC::F_SETFL, new_flags) unless new_flags == current_flags end - private def system_blocking_init(blocking : Bool?) + protected def system_blocking_init(blocking : Bool?) if blocking.nil? blocking = case system_info.type diff --git a/src/crystal/system/wasi/file.cr b/src/crystal/system/wasi/file.cr index a48463eded4e..5d273a6d602d 100644 --- a/src/crystal/system/wasi/file.cr +++ b/src/crystal/system/wasi/file.cr @@ -2,7 +2,7 @@ require "../unix/file" # :nodoc: module Crystal::System::File - protected def system_set_mode(mode : String) + protected def system_init(mode : String, blocking : Bool) : Nil end def self.chmod(path, mode) diff --git a/src/crystal/system/wasi/file_descriptor.cr b/src/crystal/system/wasi/file_descriptor.cr index 4b9605bde348..38e76447f444 100644 --- a/src/crystal/system/wasi/file_descriptor.cr +++ b/src/crystal/system/wasi/file_descriptor.cr @@ -17,7 +17,7 @@ module Crystal::System::FileDescriptor r end - private def system_blocking_init(blocking : Bool?) + protected def system_blocking_init(blocking : Bool?) end private def system_reopen(other : IO::FileDescriptor) diff --git a/src/crystal/system/win32/file.cr b/src/crystal/system/win32/file.cr index ef2d1ddd0bb2..5bc18a6ea303 100644 --- a/src/crystal/system/win32/file.cr +++ b/src/crystal/system/win32/file.cr @@ -14,7 +14,7 @@ module Crystal::System::File # write at the end of the file. getter? system_append = false - def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : FileDescriptor::Handle + def self.open(filename : String, mode : String, perm : Int32 | ::File::Permissions, blocking : Bool?) : {FileDescriptor::Handle, Bool} perm = ::File::Permissions.new(perm) if perm.is_a? Int32 # Only the owner writable bit is used, since windows only supports # the read only attribute. @@ -25,7 +25,7 @@ module Crystal::System::File end case result = EventLoop.current.open(filename, open_flag(mode), ::File::Permissions.new(perm), blocking != false) - in FileDescriptor::Handle + in Tuple(FileDescriptor::Handle, Bool) result in WinError raise ::File::Error.from_os_error("Error opening file with mode '#{mode}'", result, file: filename) @@ -88,8 +88,9 @@ module Crystal::System::File {access, disposition, attributes} end - protected def system_set_mode(mode : String) + protected def system_init(mode : String, blocking : Bool) : Nil @system_append = true if mode.starts_with?('a') + @system_blocking = blocking end private def write_blocking(handle, slice) diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 65c950171b5b..42f0c4905891 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -104,7 +104,7 @@ module Crystal::System::FileDescriptor end end - def system_blocking_init(blocking : Bool?) + protected def system_blocking_init(blocking : Bool?) if blocking.nil? # there are no official API to know whether a handle has been opened with # the OVERLAPPED flag, but the following call is supposed to leak the diff --git a/src/fiber/execution_context.cr b/src/fiber/execution_context.cr index 1c1ae2f1eb21..8d2289740ebc 100644 --- a/src/fiber/execution_context.cr +++ b/src/fiber/execution_context.cr @@ -122,6 +122,10 @@ module Fiber::ExecutionContext Thread.current.execution_context end + def self.current? : ExecutionContext? + Thread.current.execution_context? + end + # :nodoc: # # Tells the current scheduler to suspend the current fiber and resume the @@ -181,6 +185,7 @@ module Fiber::ExecutionContext # # Enqueues a fiber to be resumed inside the execution context. # - # May be called from any ExecutionContext (i.e. must be thread-safe). + # May be called from any ExecutionContext (i.e. must be thread-safe). May also + # be called from bare threads (outside of an ExecutionContext). abstract def enqueue(fiber : Fiber) : Nil end diff --git a/src/fiber/execution_context/multi_threaded.cr b/src/fiber/execution_context/multi_threaded.cr index 9793c5468b0b..e843178d1d10 100644 --- a/src/fiber/execution_context/multi_threaded.cr +++ b/src/fiber/execution_context/multi_threaded.cr @@ -180,7 +180,7 @@ module Fiber::ExecutionContext # :nodoc: def enqueue(fiber : Fiber) : Nil - if ExecutionContext.current == self + if ExecutionContext.current? == self # local enqueue: push to local queue of current scheduler ExecutionContext::Scheduler.current.enqueue(fiber) else diff --git a/src/fiber/execution_context/single_threaded.cr b/src/fiber/execution_context/single_threaded.cr index 74ad81ea51ce..f4a2aa4f3694 100644 --- a/src/fiber/execution_context/single_threaded.cr +++ b/src/fiber/execution_context/single_threaded.cr @@ -102,7 +102,7 @@ module Fiber::ExecutionContext # :nodoc: def enqueue(fiber : Fiber) : Nil - if ExecutionContext.current == self + if ExecutionContext.current? == self # local enqueue Crystal.trace :sched, "enqueue", fiber: fiber @runnables.push(fiber) diff --git a/src/file.cr b/src/file.cr index 5c39fa0e5a15..2fba32efb1fb 100644 --- a/src/file.cr +++ b/src/file.cr @@ -125,16 +125,13 @@ class File < IO::FileDescriptor include Crystal::System::File - # This constructor is provided for subclasses to be able to initialize an - # `IO::FileDescriptor` with a *path* and *fd*. - private def initialize(@path, fd : Int, blocking = false, encoding = nil, invalid = nil) - self.set_encoding(encoding, invalid: invalid) if encoding - super(fd, blocking) - end - - # :nodoc: - def self.from_fd(path : String, fd : Int, *, blocking = false, encoding = nil, invalid = nil) - new(path, fd, blocking: blocking, encoding: encoding, invalid: invalid) + # This constructor is for constructors to be able to initialize a `File` with + # a *path* and *fd*. The *blocking* param is informational and must reflect + # the non/blocking state of the underlying fd. + private def initialize(@path, fd : Int, mode = "", blocking = true, encoding = nil, invalid = nil) + super(handle: fd) + system_init(mode, blocking) + set_encoding(encoding, invalid: invalid) if encoding end # Opens the file named by *filename*. @@ -173,8 +170,8 @@ class File < IO::FileDescriptor # additional syscall. def self.new(filename : Path | String, mode = "r", perm = DEFAULT_CREATE_PERMISSIONS, encoding = nil, invalid = nil, blocking = true) filename = filename.to_s - fd = Crystal::System::File.open(filename, mode, perm: perm, blocking: blocking) - new(filename, fd, blocking: blocking, encoding: encoding, invalid: invalid).tap { |f| f.system_set_mode(mode) } + fd, blocking = Crystal::System::File.open(filename, mode, perm: perm, blocking: blocking) + new(filename, fd, mode, !!blocking, encoding, invalid) end getter path : String diff --git a/src/file/tempfile.cr b/src/file/tempfile.cr index e1b764f35dbf..216f31bde870 100644 --- a/src/file/tempfile.cr +++ b/src/file/tempfile.cr @@ -64,8 +64,8 @@ class File # # It is the caller's responsibility to remove the file when no longer needed. def self.tempfile(prefix : String?, suffix : String?, *, dir : String = Dir.tempdir, encoding = nil, invalid = nil) - fileno, path = Crystal::System::File.mktemp(prefix, suffix, dir) - new(path, fileno, blocking: true, encoding: encoding, invalid: invalid) + fileno, path, blocking = Crystal::System::File.mktemp(prefix, suffix, dir) + new(path, fileno, blocking: blocking, encoding: encoding, invalid: invalid) end # Creates a temporary file. diff --git a/src/io/file_descriptor.cr b/src/io/file_descriptor.cr index 9cbbc759e65c..7ca360eabdf0 100644 --- a/src/io/file_descriptor.cr +++ b/src/io/file_descriptor.cr @@ -39,11 +39,17 @@ class IO::FileDescriptor < IO write_timeout end - def initialize(fd : Handle, blocking = nil, *, @close_on_finalize = true) - @volatile_fd = Atomic.new(fd) + def self.new(fd : Handle, blocking = nil, *, close_on_finalize = true) + file_descriptor = new(handle: fd, close_on_finalize: close_on_finalize) + file_descriptor.system_blocking_init(blocking) unless file_descriptor.closed? + file_descriptor + end + + # :nodoc: + def initialize(*, handle : Handle, @close_on_finalize = true) + @volatile_fd = Atomic.new(handle) @closed = true # This is necessary so we can reference `self` in `system_closed?` (in case of an exception) @closed = system_closed? - system_blocking_init(blocking) unless @closed end # :nodoc: