Skip to content

Commit

Permalink
A manager every 10 concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Dec 24, 2016
1 parent 59bc99b commit 540b45a
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 9 deletions.
Binary file removed Thread-pool 0.05.png
Binary file not shown.
Binary file removed Thread-pool 0.10.png
Binary file not shown.
Binary file removed Thread-pool 0.15.png
Binary file not shown.
Binary file removed Thread-pool 0.25.png
Binary file not shown.
Binary file removed Thread-pool 1.png
Binary file not shown.
Binary file removed Thread-pool done & dispatch.png
Binary file not shown.
Binary file removed Thread-pool wait 10 seconds.png
Binary file not shown.
25 changes: 20 additions & 5 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,32 @@ class Launcher
include Util

def initialize
@manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
count = Shoryuken.options.fetch(:concurrency, 25)

raise(ArgumentError, "Concurrency value #{count} is invalid, it needs to be a positive number") unless count > 0

manager_count = count / 10
manager_count = 1 if manager_count < 1

concurrency = count / manager_count

@managers = Array.new(manager_count) do
Shoryuken::Manager.new(concurrency,
Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end
end

def stop(options = {})
@manager.stop(shutdown: !!options[:shutdown],
timeout: Shoryuken.options[:timeout])
@managers.map do |manager|
Thread.new { manager.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]) }
end.each(&:join)
end

def run
@manager.start
@managers.map do |manager|
Thread.new { manager.start }
end.each(&:join)
end
end
end
7 changes: 3 additions & 4 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ class Manager

BATCH_LIMIT = 10

def initialize(fetcher, polling_strategy)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
def initialize(count, fetcher, polling_strategy)
@count = count

@queues = Shoryuken.queues.dup.uniq

Expand All @@ -16,7 +15,7 @@ def initialize(fetcher, polling_strategy)
@fetcher = fetcher
@polling_strategy = polling_strategy

@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.15, timeout_interval: 60) { dispatch }
@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch }

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
end
Expand Down

0 comments on commit 540b45a

Please sign in to comment.