Skip to content

Commit

Permalink
Merge pull request #291 from phstc/thread-pool
Browse files Browse the repository at this point in the history
RIP Celluloid, Hello concurrent-ruby/Thread pool

Fixes #185
  • Loading branch information
phstc authored Feb 14, 2017
2 parents 087b175 + 7f18e02 commit 4f2ba80
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 295 deletions.
1 change: 0 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ end
desc 'Open Shoryuken pry console'
task :console do
require 'pry'
require 'celluloid/current'
require 'shoryuken'

config_file = File.join File.expand_path('..', __FILE__), 'shoryuken.yml'
Expand Down
2 changes: 1 addition & 1 deletion examples/default_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ class DefaultWorker
shoryuken_options queue: 'default', auto_delete: true

def perform(sqs_msg, body)
Shoryuken.logger.info("Received message: '#{body}'")
Shoryuken.logger.debug("Received message: '#{body}'")
end
end
6 changes: 6 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'yaml'
require 'json'
require 'aws-sdk-core'
require 'time'
require 'concurrent'

require 'shoryuken/version'
require 'shoryuken/core_ext'
Expand All @@ -22,6 +24,10 @@
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'
require 'shoryuken/manager'
require 'shoryuken/launcher'
require 'shoryuken/processor'
require 'shoryuken/fetcher'

module Shoryuken
DEFAULTS = {
Expand Down
46 changes: 13 additions & 33 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class CLI
include Util
include Singleton

attr_accessor :launcher

def run(args)
self_read, self_write = IO.pipe

Expand All @@ -41,56 +39,45 @@ def run(args)

loader.load

load_celluloid
initialize_concurrent_logger

require 'shoryuken/launcher'
@launcher = Shoryuken::Launcher.new

if callback = Shoryuken.start_callback
if (callback = Shoryuken.start_callback)
logger.info { 'Calling Shoryuken.on_start block' }
callback.call
end

fire_event(:startup)

begin
launcher.run
@launcher.run

while (readable_io = IO.select([self_read]))
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
launcher.stop(shutdown: true)
@launcher.stop(shutdown: true)
exit 0
end
end

private

def load_celluloid
require 'celluloid/current'
Celluloid.logger = (Shoryuken.options[:verbose] ? Shoryuken.logger : nil)

require 'shoryuken/manager'
end
def initialize_concurrent_logger
return unless Shoryuken.logger

def celluloid_loaded?
defined?(::Celluloid)
Concurrent.global_logger = lambda do |level, progname, msg = nil, &block|
Shoryuken.logger.log(level, msg, progname, &block)
end
end

def daemonize(options)
return unless options[:daemon]

fail ArgumentError, "You really should set a logfile if you're going to daemonize" unless options[:logfile]

if celluloid_loaded?
# Celluloid can't be loaded until after we've daemonized
# because it spins up threads and creates locks which get
# into a very bad state if forked.
raise "Celluloid cannot be required until here, or it will break Shoryuken's daemonization"
end

files_to_reopen = []
ObjectSpace.each_object(File) do |file|
files_to_reopen << file unless file.closed?
Expand All @@ -116,11 +103,9 @@ def daemonize(options)
end

def write_pid(options)
if (path = options[:pidfile])
File.open(path, 'w') do |f|
f.puts Process.pid
end
end
return unless (path = options[:pidfile])

File.open(path, 'w') { |f| f.puts(Process.pid) }
end

def parse_cli_args(argv)
Expand Down Expand Up @@ -187,7 +172,7 @@ def handle_signal(sig)
when 'USR1'
logger.info { 'Received USR1, will soft shutdown down' }

launcher.stop
@launcher.stop
fire_event(:quiet, true)
exit 0
when 'TTIN'
Expand All @@ -200,11 +185,6 @@ def handle_signal(sig)
end
end

ready = launcher.manager.instance_variable_get(:@ready).size
busy = launcher.manager.instance_variable_get(:@busy).size
queues = launcher.manager.instance_variable_get(:@queues)

logger.info { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{unparse_queues(queues)}" }
else
logger.info { "Received #{sig}, will shutdown down" }

Expand Down
32 changes: 15 additions & 17 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ class Fetcher
FETCH_LIMIT = 10

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } if !sqs_msgs.empty?
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } unless sqs_msgs.empty?
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end
end

Expand Down
33 changes: 5 additions & 28 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
@@ -1,42 +1,19 @@
module Shoryuken
class Launcher
include Celluloid
include Util

trap_exit :actor_died

attr_accessor :manager

def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)

@done = false

manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
@manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
manager.terminate
end
@manager.stop(shutdown: !options[:shutdown].nil?,
timeout: Shoryuken.options[:timeout])
end

def run
watchdog('Launcher#run') do
manager.async.start
end
end

def actor_died(actor, reason)
return if @done
logger.warn { "Shoryuken died due to the following error, cannot recover, process exiting: #{reason}" }
exit 1
@manager.start
end
end
end
Loading

0 comments on commit 4f2ba80

Please sign in to comment.