Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RIP Celluloid, Hello concurrent-ruby/Thread pool #291

Merged
merged 47 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f1df7ab
WIP
Dec 19, 2016
1f3d3c4
WIP simple thread pool
Dec 19, 2016
b8f0830
Clean up celluloid references
Dec 19, 2016
ca8504f
Fix hard shutdown.
Dec 19, 2016
cb2a0e7
Clean TTIN
Dec 19, 2016
be67d4b
Implement AutoExtendVisibility using Concurrent::TimerTask
Dec 19, 2016
85c612d
Remove no longer needed methods
Dec 19, 2016
ad0a57e
Only delay hard shutdown if there's any busy worker
Dec 19, 2016
a33838c
Remove sleep used for debugging
Dec 19, 2016
ca15687
Clean up
Dec 19, 2016
da429bc
Fix specs
Dec 19, 2016
ff73931
Disable brakeman
Dec 19, 2016
e8f784f
Merge branch 'master' into thread-pool
Dec 19, 2016
dd9bfb4
Merge branch 'master' into thread-pool
Dec 19, 2016
606f0ca
Merge branch 'master' into thread-pool
Dec 19, 2016
ffd3517
Merge branch 'master' into thread-pool
Dec 19, 2016
79705a1
Remove safe navigation operator it isn't supported prior Ruby 2.3 :sad:
Dec 19, 2016
7315786
Delay is no longer needed for soft shutdown
Dec 19, 2016
dfee6d4
Wrap dispatch_later with a mutex to avoid concurrency comming from the
Dec 19, 2016
f20b676
Naming refactor
Dec 19, 2016
3bad1e0
watchdog is no longer needed
Dec 19, 2016
9b9d4b3
Remove Mutex in favor of `make_true`
Dec 19, 2016
4bd68ea
Turn dispatch/dispatch_later into a heartbeat
Dec 19, 2016
ef295c8
Fix specs
Dec 19, 2016
474838e
Use `Concurrent::TimerTask` as a heartbeat
Dec 19, 2016
8cd2817
Clean up
Dec 19, 2016
90a1363
Clean auto extend visibility
Dec 19, 2016
7803300
Better hard_shutdown_in handling with wait_for_termination
Dec 19, 2016
3962069
Naming refactor
Dec 21, 2016
eb2edff
Decrease the heartbeat interval to 0.25
Dec 23, 2016
8aa9124
Test PutsReq
Dec 23, 2016
766fe99
Bump delay to 1 sec
Dec 23, 2016
9563b08
Change delay to 0.10
Dec 23, 2016
91c3c2f
Add screenshots
Dec 23, 2016
f61c566
Dispatch when done
Dec 23, 2016
5ebfed5
non-stop dispatch
Dec 24, 2016
ac52dab
Dispatch when done
Dec 24, 2016
1fc4146
Test wait_time_seconds:10
Dec 24, 2016
1eb7092
execution_interval: 0.05
Dec 24, 2016
59bc99b
execution_interval: 0.15
Dec 24, 2016
540b45a
A manager every 10 concurrency
Dec 24, 2016
7331989
Test performance 1 processor 1 fetcher
Dec 24, 2016
93bb041
Stop logging out when 0 messages found.
Jan 12, 2017
f75a943
Clean up PutsReq tests
Feb 13, 2017
fed9d81
:lipstick:
Feb 13, 2017
c36d58f
Merge branch 'master' into thread-pool
Feb 13, 2017
7f18e02
Fix spec
Feb 13, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
.bundle
.config
.yardoc
Gemfile.lock
# Gemfile.lock
InstalledFiles
_yardoc
coverage
Expand Down
76 changes: 76 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
PATH
remote: .
specs:
shoryuken (2.1.1)
aws-sdk-core (~> 2)
concurrent-ruby

GEM
remote: https://rubygems.org/
specs:
aws-sdk-core (2.6.32)
aws-sigv4 (~> 1.0)
jmespath (~> 1.0)
aws-sigv4 (1.0.0)
byebug (9.0.5)
codeclimate-test-reporter (1.0.3)
simplecov
coderay (1.1.1)
concurrent-ruby (1.0.2)
diff-lcs (1.2.5)
docile (1.1.5)
dotenv (2.1.1)
jmespath (1.3.1)
json (2.0.2)
method_source (0.8.2)
mini_portile2 (2.1.0)
multi_xml (0.5.5)
nokogiri (1.6.8)
mini_portile2 (~> 2.1.0)
pkg-config (~> 1.1.7)
pkg-config (1.1.7)
pry (0.10.4)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
pry-byebug (3.4.0)
byebug (~> 9.0)
pry (~> 0.10)
rake (11.2.2)
rspec (3.5.0)
rspec-core (~> 3.5.0)
rspec-expectations (~> 3.5.0)
rspec-mocks (~> 3.5.0)
rspec-core (3.5.2)
rspec-support (~> 3.5.0)
rspec-expectations (3.5.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.5.0)
rspec-mocks (3.5.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.5.0)
rspec-support (3.5.0)
simplecov (0.12.0)
docile (~> 1.1.0)
json (>= 1.8, < 3)
simplecov-html (~> 0.10.0)
simplecov-html (0.10.0)
slop (3.6.0)

PLATFORMS
ruby

DEPENDENCIES
bundler (~> 1.6)
codeclimate-test-reporter
dotenv
multi_xml
nokogiri
pry-byebug
rake
rspec
shoryuken!
simplecov

BUNDLED WITH
1.13.6
2 changes: 2 additions & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
worker: bundle exec bin/shoryuken -r ./putsreq_worker.rb -q default

13 changes: 12 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ end
desc 'Open Shoryuken pry console'
task :console do
require 'pry'
require 'celluloid/current'
require 'shoryuken'

config_file = File.join File.expand_path('..', __FILE__), 'shoryuken.yml'
Expand All @@ -28,3 +27,15 @@ task :console do
ARGV.clear
Pry.start
end

desc 'Enqueue 1k messages'
task :enqueue_1k do
require 'shoryuken'

require File.join(File.expand_path('..', __FILE__), 'putsreq_worker')

1000.times do |i|
PutsReqWorker.perform_async(i.to_s)
print '.'
end
end
2 changes: 1 addition & 1 deletion examples/default_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ class DefaultWorker
shoryuken_options queue: 'default', auto_delete: true

def perform(sqs_msg, body)
Shoryuken.logger.info("Received message: '#{body}'")
Shoryuken.logger.debug("Received message: '#{body}'")
end
end
6 changes: 6 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'yaml'
require 'json'
require 'aws-sdk-core'
require 'time'
require 'concurrent'

require 'shoryuken/version'
require 'shoryuken/core_ext'
Expand All @@ -22,6 +24,10 @@
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'
require 'shoryuken/manager'
require 'shoryuken/launcher'
require 'shoryuken/processor'
require 'shoryuken/fetcher'

module Shoryuken
DEFAULTS = {
Expand Down
36 changes: 9 additions & 27 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class CLI
include Util
include Singleton

attr_accessor :launcher

def run(args)
self_read, self_write = IO.pipe

Expand All @@ -41,9 +39,8 @@ def run(args)

loader.load

load_celluloid
initialize_concurrent_logger

require 'shoryuken/launcher'
@launcher = Shoryuken::Launcher.new

if callback = Shoryuken.start_callback
Expand All @@ -54,43 +51,33 @@ def run(args)
fire_event(:startup)

begin
launcher.run
@launcher.run

while (readable_io = IO.select([self_read]))
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
launcher.stop(shutdown: true)
@launcher.stop(shutdown: true)
exit 0
end
end

private

def load_celluloid
require 'celluloid/current'
Celluloid.logger = (Shoryuken.options[:verbose] ? Shoryuken.logger : nil)

require 'shoryuken/manager'
end
def initialize_concurrent_logger
return unless Shoryuken.logger

def celluloid_loaded?
defined?(::Celluloid)
Concurrent.global_logger = lambda do |level, progname, msg = nil, &block|
Shoryuken.logger.log(level, msg, progname, &block)
end
end

def daemonize(options)
return unless options[:daemon]

fail ArgumentError, "You really should set a logfile if you're going to daemonize" unless options[:logfile]

if celluloid_loaded?
# Celluloid can't be loaded until after we've daemonized
# because it spins up threads and creates locks which get
# into a very bad state if forked.
raise "Celluloid cannot be required until here, or it will break Shoryuken's daemonization"
end

files_to_reopen = []
ObjectSpace.each_object(File) do |file|
files_to_reopen << file unless file.closed?
Expand Down Expand Up @@ -187,7 +174,7 @@ def handle_signal(sig)
when 'USR1'
logger.info { 'Received USR1, will soft shutdown down' }

launcher.stop
@launcher.stop
fire_event(:quiet, true)
exit 0
when 'TTIN'
Expand All @@ -200,11 +187,6 @@ def handle_signal(sig)
end
end

ready = launcher.manager.instance_variable_get(:@ready).size
busy = launcher.manager.instance_variable_get(:@busy).size
queues = launcher.manager.instance_variable_get(:@queues)

logger.info { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{unparse_queues(queues)}" }
else
logger.info { "Received #{sig}, will shutdown down" }

Expand Down
32 changes: 15 additions & 17 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ class Fetcher
FETCH_LIMIT = 10

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" }
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, have you considered not emitting this when the sqs_msgs are empty?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tjsingleton I haven't, but it seems to be a good idea. The only one issue is because the process will may look like isn't running/moving without that message, in case all queues are empty. WDYT?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this for awhile and as long as the process lifecycle has good log messages, I don't think it's typically expected to "heartbeat" via the log. For instance, do you trust that puma is up and processing requests despite that it's doesn't log?

logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end
end

Expand Down
39 changes: 13 additions & 26 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
@@ -1,42 +1,29 @@
module Shoryuken
class Launcher
include Celluloid
include Util

trap_exit :actor_died

attr_accessor :manager

def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
count = Shoryuken.options.fetch(:concurrency, 25)

@done = false
raise(ArgumentError, "Concurrency value #{count} is invalid, it needs to be a positive number") unless count > 0

manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
@managers = Array.new(count) do
Shoryuken::Manager.new(1,
Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
manager.terminate
end
@managers.map do |manager|
Thread.new { manager.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]) }
end.each(&:join)
end

def run
watchdog('Launcher#run') do
manager.async.start
end
end

def actor_died(actor, reason)
return if @done
logger.warn { "Shoryuken died due to the following error, cannot recover, process exiting: #{reason}" }
exit 1
@managers.map do |manager|
Thread.new { manager.start }
end.each(&:join)
end
end
end
Loading