-
-
Notifications
You must be signed in to change notification settings - Fork 280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop queuing up heartbeat threads #345
Conversation
ddc901a
to
7baae10
Compare
lib/shoryuken/manager.rb
Outdated
@@ -19,8 +19,11 @@ def initialize(fetcher, polling_strategy) | |||
@polling_strategy = polling_strategy | |||
|
|||
@heartbeat = Concurrent::TimerTask.new(run_now: true, | |||
execution_interval: HEARTBEAT_INTERVAL, | |||
timeout_interval: 60) { dispatch } | |||
execution_interval: HEARTBEAT_INTERVAL) { @pool.post { dispatch } if @dispatching.false? } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson I know it isn't the most beautiful solution in the world. But that fix the thread leaking, I still need to play more with that. But I think the "final" fix will be something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will probably do it. It reuses the same worker pool for dispatching (which I don't really think is problematic) and the @pool.post { ... }
will return instantly, meaning the timeout never occurs (because of the implementation, you can't actually turn off the timeout code in TimerTask
).
And this still lets you use TimerTask
as a supervisor-like solution for dispatch, ensuring it stays executing, even if there is an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson thanks for reviewing it 🍻
I've just changed it to use a separate and single thread pool.
I'm now running some performance tests, let's see how it goes 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
As long as the block finishes faster than the execution_timeout
then this leak won't occur, even if the currently broken state of concurrent-ruby
.
Now if only the team there could be convinced that there is a bug in the first place. 😜
lib/shoryuken/manager.rb
Outdated
|
||
Concurrent::TimerTask.new(execution_interval: 1) do | ||
Shoryuken.logger.info "Threads: #{Thread.list.size}" | ||
end.execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson With concurrency 10: bundle exec bin/shoryuken -c 10 -q test -r ./test.rb
, the thread size kept consistent at 16.
71a56e3
to
1cd46bf
Compare
lib/shoryuken/manager.rb
Outdated
|
||
@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count) | ||
@dispatcher_pool = Concurrent::SingleThreadExecutor.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as the block finishes faster than the execution_timeout then this leak won't occur, even if the currently broken state of concurrent-ruby.
@waynerobinson now as I'm using this SingleThreadExecutor
, it's no longer an issue, it supports only one thread at time and the fallback policy is discard. If we try to post
while there's a thread running, it will just discard the post
.
Now if only the team there could be convinced that there is a bug in the first place. 😜
I will try to reply on that issue as well. I think they should allow to configure a max_queue
and fallback_policy
for the TimerTask
, too bad they don't allow it, and it ends up having a max_queue: infinity
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except that TimerTask
is the thing that's leaking threads because of the ones it creates to do the timeout. But I'm sure the scheduling of a dispatch task occurs faster than the heartbeat period.
Don't really need a max_queue
in TimerTask
because the way the class is designed it should be locking around the actual task and only letting one operate at a time.
But the lack of a correct implementation for the timeout causes thread leaks in the timeout monitor if the task takes longer than the execution interval (it should be the timeout interval at the very least, but it never refers to this… hence the bug) to complete.
The current design intention of TimerTask
seems to have it limited to only ever using 2 threads.
lib/shoryuken/manager.rb
Outdated
@@ -3,7 +3,7 @@ class Manager | |||
include Util | |||
|
|||
BATCH_LIMIT = 10 | |||
HEARTBEAT_INTERVAL = 0.1 | |||
HEARTBEAT_INTERVAL = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson I'm testing HEARTBEAT_INTERVAL = 1
, as I'm also calling dispatch
in the processor_done
callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably going to cause issues with empty queues as it will take a least HEARTBEAT_INTERVAL
before it will request new messages.
Instead of this you could just have dispatch
run in a loop and rely on the TimerTask
to restart it if it ever dies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
def dispatch_now
while true
if ready.zero?
return unless (queue = @polling_strategy.next_queue)
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
else
sleep MINIMUM_WAIT # 0.05 or something to prevent CPU pegging
end
end
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although instead of ready.zero?
, is there just some type of latch on the pool to await a free thread so you don't have to do the sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although instead of ready.zero?, is there just some type of latch on the pool to await a free thread so you don't have to do the sleep?
@waynerobinson not I'm aware of 🐼
lib/shoryuken/manager.rb
Outdated
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) | ||
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) | ||
ensure | ||
dispatch_async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep MINIMUM_WAIT # 0.05 or something to prevent CPU pegging
@waynerobinson I need to keep it running longer, but it seems to be working without a sleep
. As the @dispatcher_executor
discards post
when there's a thread running, the request for fetching is kind of doing sleep
, it should only be like without any sleep if all processors are busy or non queue is available as you pointed out. I will keep monitoring the CPU, if that does not work, we can def add sleep 0.05
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always use Receive Wait Time of > 0, so it will always block if the network is up anyway. The sleep is really just for worst case situations.
|
||
@fetcher = fetcher | ||
@polling_strategy = polling_strategy | ||
|
||
@heartbeat = Concurrent::TimerTask.new(run_now: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson I'm doing the loop
in a ensure
block, and also calling dispatch_async
when a processor done, just in case. So I don't think we need to the heartbeat anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the dispatch_async
in processor_done
does anything given the normal dispatcher re-runs at the end anyway.
But the ensure and re-run should keep the dispatch loop running I think. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson you are right, maybe I was too overcautious. I added it in there just in case. But I'm considering removing it, ensure
should work.
Great work! 🍾 |
Fix #338