Skip to content

Commit

Permalink
Rework implementation to use Fiber#transfer. Fixes #23.
Browse files Browse the repository at this point in the history
Ruby 3 allows mixing `Fiber#resume/#yield` and `Fiber#transfer`. We can
take advantage of that to minimise the impact of non-blocking operations
on user flow control.

Previously, non-blocking operations would invoke `Fiber.yield` and this
was a user-visible side-effect. We did take advantage of it, but it also
meant that integration of Async with existing usage of Fiber could be
problematic. We tracked the most obvious issues in `enumerator_spec.rb`.

Now, non-blocking operations transfer directly to the scheduler fiber
and thus don't impact other usage of resume/yield.
  • Loading branch information
ioquatix committed Jun 6, 2021
1 parent e63a793 commit a9a83b3
Show file tree
Hide file tree
Showing 28 changed files with 651 additions and 1,184 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ jobs:
- macos

ruby:
- 2.5
- 2.6
- 2.7
- 3.0
- head

experimental: [false]
env: [""]
Expand All @@ -34,7 +31,7 @@ jobs:
ruby: head
experimental: true
- os: ubuntu
ruby: 2.6
ruby: head
env: COVERAGE=PartialSummary,Coveralls
experimental: true

Expand Down
5 changes: 3 additions & 2 deletions async.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ Gem::Specification.new do |spec|

spec.files = Dir.glob('{lib}/**/*', File::FNM_DOTMATCH, base: __dir__)

spec.required_ruby_version = ">= 2.5.0"
spec.required_ruby_version = ">= 3.1.0"

spec.add_dependency "console", "~> 1.10"
spec.add_dependency "nio4r", "~> 2.3"

spec.add_dependency "event"
spec.add_dependency "timers", "~> 4.1"

spec.add_development_dependency "async-rspec", "~> 1.1"
Expand Down
2 changes: 1 addition & 1 deletion examples/fibers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def timeout(duration)

timer = reactor.add_timer(duration) do
if self.alive?
error = Fiber::TimeoutError.new("execution expired")
error = Fiber::TimeoutError.new
error.set_backtrace backtrace
self.resume error
end
Expand Down
3 changes: 3 additions & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

gemspec

# gem "event", path: "../event"
# gem "async-rspec", path: "../async-rspec"

group :maintenance, optional: true do
gem "bake-bundler"
gem "bake-modernize"
Expand Down
5 changes: 2 additions & 3 deletions lib/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
# THE SOFTWARE.

require_relative "async/version"
require_relative "async/logger"
require_relative "async/reactor"

require_relative "kernel/async"
require_relative "kernel/sync"

module Async
# Invoke `Reactor.run` with all arguments/block.
def self.run(*arguments, &block)
Reactor.run(*arguments, &block)
def self.run(...)
Reactor.run(...)
end
end
8 changes: 3 additions & 5 deletions lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ def wait
fiber = Fiber.current
@waiting << fiber

Task.yield

# It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this.
# ensure when not return or ensure when raise, throw
Fiber.scheduler.transfer
rescue Exception
# It would be nice if there was a better construct for this. We only need to invoke #delete if the task was not resumed normally. This can only occur with `raise` and `throw`. But there is no easy way to detect this.
@waiting.delete(fiber)
raise
end
Expand All @@ -61,7 +59,7 @@ def signal(value = nil)
@waiting = []

waiting.each do |fiber|
fiber.resume(value) if fiber.alive?
Fiber.scheduler.resume(fiber, value) if fiber.alive?
end

return nil
Expand Down
60 changes: 6 additions & 54 deletions lib/async/debug/selector.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
# Copyright, 2021, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand All @@ -20,62 +20,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'monitor'
require_relative '../logger'

require 'nio'
require 'set'
require 'fiber'
require 'event/debug/selector'

module Async
module Debug
class LeakError < RuntimeError
def initialize(monitors)
super "Trying to close selector with active monitors: #{monitors.inspect}! This may cause your socket or file descriptor to leak."
end
end

class Selector
def initialize(selector = NIO::Selector.new)
@selector = selector
@monitors = Set.new
end

def register(object, interests)
Async.logger.debug(self) {"Registering #{object.inspect} for #{interests}."}

unless io = ::IO.try_convert(object)
raise RuntimeError, "Could not convert #{io} into IO!"
end

monitor = Monitor.new(@selector.register(object, interests), self)

@monitors.add(monitor)

return monitor
end

def deregister(monitor)
Async.logger.debug(self) {"Deregistering #{monitor.inspect}."}

unless @monitors.delete?(monitor)
raise RuntimeError, "Trying to remove monitor for #{monitor.inspect} but it was not registered!"
end
end

def wakeup
@selector.wakeup
end

def close
if @monitors.any?
raise LeakError, @monitors
end
ensure
@selector.close
end

def select(*arguments)
@selector.select(*arguments)
class Selector < Event::Debug::Selector
def initialize(selector = nil)
super(selector || Event::Backend.new(Fiber.current))
end
end
end
Expand Down
45 changes: 24 additions & 21 deletions lib/async/debug/monitor.rb → lib/async/interrupt.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# frozen_string_literal: true

# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand All @@ -20,28 +18,33 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'delegate'

module Async
module Debug
class Monitor < Delegator
def initialize(monitor, selector)
@monitor = monitor
@selector = selector
end

def __getobj__
@monitor
end
# A thread safe synchronisation primative.
class Interrupt
def initialize(scheduler, &block)
@scheduler = scheduler
@input, @output = IO.pipe

def close
@selector.deregister(self)
@monitor.close
@fiber = Fiber.new do
while true
@scheduler.io_wait(@fiber, @input, ::Event::READABLE)
block.call(@input.read_nonblock(1))
end
end

def inspect
"\#<#{self.class} io=#{@monitor.io.inspect} interests=#{@monitor.interests.inspect} readiness=#{@monitor.readiness.inspect}>"
end
@fiber.transfer
end

def signal(event = '.')
@output.write('.')
@output.flush
end

def close
@input.close
@output.close
end
end

private_constant :Interrupt
end
28 changes: 0 additions & 28 deletions lib/async/logger.rb

This file was deleted.

10 changes: 9 additions & 1 deletion lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def initialize(parent = nil, annotation: nil, transient: false)
end
end

def root
@parent&.root || self
end

# You should not directly rely on these pointers but instead use `#children`.
# List pointers:
attr_accessor :head
Expand Down Expand Up @@ -225,7 +229,7 @@ def backtrace(*arguments)
end

def to_s
"\#<#{description}>"
"\#<#{self.description}>"
end

# Change the parent of this node.
Expand Down Expand Up @@ -329,6 +333,10 @@ def stop(later = false)
end
end

def stopped?
@children.nil?
end

def print_hierarchy(out = $stdout, backtrace: true)
self.traverse do |node, level|
indent = "\t" * level
Expand Down
6 changes: 3 additions & 3 deletions lib/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Notification < Condition
def signal(value = nil, task: Task.current)
return if @waiting.empty?

task.reactor << Signal.new(@waiting, value)
Fiber.scheduler << Signal.new(@waiting, value)

@waiting = []

Expand All @@ -42,9 +42,9 @@ def alive?
true
end

def resume
def transfer
waiting.each do |fiber|
fiber.resume(value) if fiber.alive?
fiber.transfer(value) if fiber.alive?
end
end
end
Expand Down
Loading

0 comments on commit a9a83b3

Please sign in to comment.